f# - 如何使用 Akka.FSharp API 在 Akka.NET 集群中实现故障转移?

标签 f# akka.net akka.net-cluster akka.fsharp

如何使用 Akka.FSharp API 在 Akka.NET 集群中实现故障转移?

我有以下集群节点作为种子:

open Akka
open Akka.FSharp
open Akka.Cluster
open System
open System.Configuration

let systemName = "script-cluster"
let nodeName = sprintf "cluster-node-%s" Environment.MachineName
let akkaConfig = Configuration.parse("""akka {  
                                          actor {
                                            provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
                                          }
                                          remote {
                                            log-remote-lifecycle-events = off
                                            helios.tcp {
                                                hostname = "127.0.0.1"
                                                port = 2551       
                                            }
                                          }
                                          cluster {
                                            roles = ["seed"]  # custom node roles
                                            seed-nodes = ["akka.tcp://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="611202130811154c020d1412150413215053564f514f514f50" rel="noreferrer noopener nofollow">[email protected]</a>:2551"]
                                            # when node cannot be reached within 10 sec, mark is as down
                                            auto-down-unreachable-after = 10s
                                          }
                                        }""")
let actorSystem = akkaConfig |> System.create systemName

let clusterHostActor =
    spawn actorSystem nodeName (fun (inbox: Actor<ClusterEvent.IClusterDomainEvent>) -> 
        let cluster = Cluster.Get actorSystem
        cluster.Subscribe(inbox.Self, [| typeof<ClusterEvent.IClusterDomainEvent> |])
        inbox.Defer(fun () -> cluster.Unsubscribe(inbox.Self))
        let rec messageLoop () = 
            actor {
                let! message = inbox.Receive()                        
                // TODO: Handle messages
                match message with
                | :? ClusterEvent.MemberJoined as event -> printfn "Member %s Joined the Cluster at %O" event.Member.Address.Host DateTime.Now
                | :? ClusterEvent.MemberLeft as event -> printfn "Member %s Left the Cluster at %O" event.Member.Address.Host DateTime.Now
                | other -> printfn "Cluster Received event %O at %O" other DateTime.Now

                return! messageLoop()
            }
        messageLoop())

然后我有一个可能死亡的任意节点:

open Akka
open Akka.FSharp
open Akka.Cluster
open System
open System.Configuration

let systemName = "script-cluster"
let nodeName = sprintf "cluster-node-%s" Environment.MachineName
let akkaConfig = Configuration.parse("""akka {  
                                          actor {
                                            provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
                                          }
                                          remote {
                                            log-remote-lifecycle-events = off
                                            helios.tcp {
                                                hostname = "127.0.0.1"
                                                port = 0       
                                            }
                                          }
                                          cluster {
                                            roles = ["role-a"]  # custom node roles
                                            seed-nodes = ["akka.tcp://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6b180819021b1f4608071e181f0e192b5a595c455b455b455a" rel="noreferrer noopener nofollow">[email protected]</a>:2551"]
                                            # when node cannot be reached within 10 sec, mark is as down
                                            auto-down-unreachable-after = 10s
                                          }
                                        }""")
let actorSystem = akkaConfig |> System.create systemName

let listenerRef =  
    spawn actorSystem "temp2"
    <| fun mailbox ->
        let cluster = Cluster.Get (mailbox.Context.System)
        cluster.Subscribe (mailbox.Self, [| typeof<ClusterEvent.IMemberEvent>|])
        mailbox.Defer <| fun () -> cluster.Unsubscribe (mailbox.Self)
        printfn "Created an actor on node [%A] with roles [%s]" cluster.SelfAddress (String.Join(",", cluster.SelfRoles))
        let rec seed () = 
            actor {
                let! (msg: obj) = mailbox.Receive ()
                match msg with
                | :? ClusterEvent.MemberRemoved as actor -> printfn "Actor removed %A" msg
                | :? ClusterEvent.IMemberEvent           -> printfn "Cluster event %A" msg
                | _ -> printfn "Received: %A" msg
                return! seed () }
        seed ()

在集群内实现故障转移的建议做法是什么?

具体来说,是否有一个代码示例来说明当其中一个节点不再可用时集群应如何表现?

  • 我的集群节点是否应该启动替换节点或者是否有不同的行为?
  • 是否有一个配置可以自动处理这个问题,我只需设置而无需编写代码?
  • 我必须在何处实现哪些代码?

最佳答案

首先,依赖MemberUp是一个更好的主意。和 MemberRemoved事件(都实现 ClusterEvent.IMemberEvent 接口(interface),因此订阅它),因为它们标记阶段,即节点加入/离开过程完成时。加入和离开事件不一定确保节点在发出信号的时间点完全可操作。

关于故障转移场景:

  • 可以通过 Akka.Cluster.Sharding 插件自动旋转替换(阅读文章 12 以获取有关其工作原理的更多信息)。 Akka.FSharp 中没有等效项,但您可以使用 Akkling.Cluster.Sharding插件:参见 example code .
  • 另一种方法是在每个节点上预先创建替换参与者。您可以使用 clustered routers 将消息路由给他们或distributed publish/subscribe 。然而,当您有无状态场景时,这更常见,因此每个 Actor 都完全能够随时接听另一个 Actor 的工作。这是在许多不同节点上的许多参与者之间分配工作的更通用的解决方案。
  • 您还可以设置观察者来处理处理参与者。通过使用monitor函数中,你可以命令你的 Actor 监视另一个 Actor (无论他住在哪里)。如果节点发生故障,有关死亡参与者的信息将以终止消息的形式发送给其所有观察者。这样您就可以实现自己的逻辑,即在另一个节点上重新创建参与者。这实际上是最通用的方式,因为它不使用任何额外的插件或配置,但行为需要您自己描述。

关于f# - 如何使用 Akka.FSharp API 在 Akka.NET 集群中实现故障转移?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42795321/

相关文章:

function - 错误: The value Recusion function will be evaluated as part of its own definition in F#

f# - 寓言实现外部接口(interface)编译错误

f# - 是否可以计算 f# 中负数的连分数?

c# - Akka.NET 和 MVVM

c# - Akka.Net (C#) 向集群中的所有参与者发布消息

f# - Array.select 元组的元素

akka.net - 无法让主管策略在 Akka.NET 中工作

akka.net - Akka.Remote - 解除关联后无法向远程参与者发送消息

akka.net 循环组路由器 - 只路由到一个路由

Akka.Net 集群单例 - 当前单例节点意外关闭时不会发生切换