spring – Akka actors和Clustering – 我遇到了ClusterSingletonManager-处于状态Start的未处理事件的问题

我有一个使用Akka 2.2.4的系统,它创建了一堆本地actor并将它们设置为广播路由器的路由.根据我们传递的一些哈希范围,每个工作者处理总工作的某些部分.它很棒.

现在,我必须将此应用程序集群以进行故障转移.基于在集群上只存在/触发每个散列范围一个工作者的要求,在我看来,将每个作为ClusterSingletonManager设置将是有意义的……然而,我无法使其工作. actor系统启动,它创建了ClusterSingletonManager,它将下面引用的代码中的路径添加到广播路由器,但它从来没有实例化我的实际工作者演员来处理我的消息.我得到的只是一条日志消息:“处于状态Start的未处理事件${my message}”.我究竟做错了什么?启动这个单实例集群还需要做些什么吗?我向错误的演员发送了一条消息吗?

这是我的akka​​配置(我使用默认配置作为后备):

akka{
    cluster{
        roles=["workerSystem"]
        min-nr-of-members = 1
        role {
        workerSystem.min-nr-of-members = 1
}
    }
    daemonic = true
    remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = ${akkaPort}
        }
    }
    actor{
        provider = akka.cluster.ClusterActorRefProvider
        single-message-bound-mailbox {
              # FQCN of the MailboxType. The Class of the FQCN must have a public
              # constructor with
              # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
              mailbox-type = "akka.dispatch.BoundedMailbox"

              # If the mailbox is bounded then it uses this setting to determine its
              # capacity. The provided value must be positive.
              # NOTICE:
              # Up to version 2.1 the mailbox type was determined based on this setting;
              # this is no longer the case, the type must explicitly be a bounded mailbox.
              mailbox-capacity = 1

              # If the mailbox is bounded then this is the timeout for enqueueing
              # in case the mailbox is full. Negative values signify infinite
              # timeout, which should be avoided as it bears the risk of dead-lock.
              mailbox-push-timeout-time = 1

        }
        worker-dispatcher{
         type = PinnedDispatcher
         executor = "thread-pool-executor"
          # Throughput defines the number of messages that are processed in a batch
          # before the thread is returned to the pool. Set to 1 for as fair as possible.
         throughput = 500
         thread-pool-executor {
            # Keep alive time for threads
            keep-alive-time = 60s

            # Min number of threads to cap factor-based core number to
            core-pool-size-min = ${workerCount}

            # The core pool size factor is used to determine thread pool core size
            # using the following formula: ceil(available processors * factor).
            # Resulting size is then bounded by the core-pool-size-min and
            # core-pool-size-max values.
            core-pool-size-factor = 3.0

            # Max number of threads to cap factor-based number to
            core-pool-size-max = 64

            # Minimum number of threads to cap factor-based max number to
            # (if using a bounded task queue)
            max-pool-size-min = ${workerCount}

            # Max no of threads (if using a bounded task queue) is determined by
            # calculating: ceil(available processors * factor)
            max-pool-size-factor  = 3.0

            # Max number of threads to cap factor-based max number to
            # (if using a  bounded task queue)
            max-pool-size-max = 64

            # Specifies the bounded capacity of the task queue (< 1 == unbounded)
            task-queue-size = -1

            # Specifies which type of task queue will be used, can be "array" or
            # "linked" (default)
            task-queue-type = "linked"

            # Allow core threads to time out
            allow-core-timeout = on
          }
         fork-join-executor {
            # Min number of threads to cap factor-based parallelism number to
            parallelism-min = 1

            # The parallelism factor is used to determine thread pool size using the
            # following formula: ceil(available processors * factor). Resulting size
            # is then bounded by the parallelism-min and parallelism-max values.
            parallelism-factor = 3.0

            # Max number of threads to cap factor-based parallelism number to
            parallelism-max = 1
          }
        }
    }
}

这是我创建Actors的地方(它是用Groovy编写的):

            Props clusteredProps = ClusterSingletonManager.defaultProps("worker".toString(), PoisonPill.getInstance(), "workerSystem",
                    new ClusterSingletonPropsFactory(){

                        @Override
                        Props create(Object handOverData) {
                            log.info("called in ClusterSingetonManager")
                            Props.create(WorkerActorCreator.create(applicationContext, it.start, it.end)).withDispatcher("akka.actor.worker-dispatcher").withMailbox("akka.actor.single-message-bound-mailbox")
                        }
                    } )
            ActorRef manager = system.actorOf(clusteredProps, "worker-${it.start}-${it.end}".toString())
            String path = manager.path().child("worker").toString()
            path

当我尝试向实际的工作者发送消息时,上面的路径应该解决吗?目前它没有.
我究竟做错了什么?此外,这些actor位于Spring应用程序中,并且worker actor设置了一些@Autowired依赖项.虽然这种Spring集成在非集群环境中运行良好,但是在集群环境中是否有任何问题需要我注意?

谢谢

仅供参考:我也在akka-user google群组中发布了此内容.这是link.

解决方法:

代码中的路径是您在具有角色“workerSystem”的每个节点上启动的ClusterSingletonManager actor.它将在集群中最旧的节点上创建名为“worker – ${it.start} – ${it.end}”的子actor(WorkerActor),即集群中的单例.

您还应该定义ClusterSingletonManager的名称,例如system.actorOf(clusteredProps,“workerSingletonManager”).

您无法将消息发送到ClusterSingletonManager.您必须将它们发送到活动工作程序的路径,即包括最旧节点的地址.这可以通过documentation中的ConsumerProxy来说明.

我不确定你应该使用单身人士.所有工作人员都将在最早的同一节点上运行.我更愿意在akka-user谷歌小组讨论您的问题的替代解决方案.

相关文章

背景:    8月29日,凌晨4点左右,某服务告警,其中一个...
https://support.smartbear.comeadyapi/docs/soapui/steps/g...
有几个选项可用于执行自定义JMeter脚本并扩展基线JMeter功能...
Scala和Java为静态语言,Groovy为动态语言Scala:函数式编程,...
出处:https://www.jianshu.com/p/ce6f8a1f66f4一、一些内部...
在运行groovy的junit方法时,报了这个错误:java.lang.Excep...