Java、Akka Actor 和有界邮箱

标签 java scala akka

我在 application.conf 中有以下配置:

bounded-mailbox {
  mailbox-type = "akka.dispatch.BoundedMailbox"
  mailbox-capacity = 100
  mailbox-push-timeout-time = 3s
}

akka {

    loggers = ["akka.event.slf4j.Slf4jLogger"]
    loglevel = INFO
    daemonic = on
}

这是我配置 Actor 的方式

public class MyTestActor extends UntypedActor implements RequiresMessageQueue<BoundedMessageQueueSemantics>{


    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String){

             Thread.sleep(500);
             System.out.println("message = " + message);
        }
        else {
            System.out.println("Unknown Message " );
        }

    }
}

现在这就是我启动这个 actor 的方式:

myTestActor = myActorSystem.actorOf(Props.create(MyTestActor.class).withMailbox("bounded-mailbox"), "simple-actor");

之后,在我的代码中,我向该 Actor 发送了 3000 条消息。

  for (int i =0;i<3000;i++){
        myTestActor.tell(guestName, null);}

我希望看到的异常是我的队列已满,但我的消息每隔半秒就会在 onReceive 方法中打印一次,就像什么都没发生一样。所以我相信我的邮箱配置没有应用。

我做错了什么?

更新:我创建了订阅死信事件的 Actor :

deadLetterActor = myActorSystem.actorOf(Props.create(DeadLetterMonitor.class),"deadLetter-monitor");

并安装了 Kamon 用于队列监控:

在向 Actor 发送了 3000 条消息后,Kamin 向我展示了以下内容:

Actor: user/simple-actor 

MailBox size:
 Min: 100  
Avg.: 100.0 
Max: 101

Actor: system/deadLetterListener 

MailBox size:
 Min: 0  
Avg.: 0.0 
Max: 0

Actor: system/deadLetter-monitor 

MailBox size:
 Min: 0  
Avg.: 0.0 
Max: 0

最佳答案

默认情况下,Akka 会将溢出的消息丢弃到 DeadLetters 中,并且 actor 不会停止处理: https://github.com/akka/akka/blob/876b8045a1fdb9cdd880eeab8b1611aa976576f6/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala#L411

但是在丢弃消息之前,发送线程会在 mailbox-push-timeout-time 配置的时间间隔内被阻塞。尝试将它减少到 1 毫秒,看看下面的测试是否会通过:

import java.util.concurrent.atomic.AtomicInteger

import akka.actor._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory._
import org.specs2.mutable.Specification

class BoundedActorSpec extends Specification {
  args(sequential = true)

  def config: Config = load(parseString(
    """
    bounded-mailbox {
      mailbox-type = "akka.dispatch.BoundedMailbox"
      mailbox-capacity = 100
      mailbox-push-timeout-time = 1ms
    }
    """))

  val system = ActorSystem("system", config)

  "some messages should go to dead letters" in {
    system.eventStream.subscribe(system.actorOf(Props(classOf[DeadLetterMetricsActor])), classOf[DeadLetter])
    val myTestActor = system.actorOf(Props(classOf[MyTestActor]).withMailbox("bounded-mailbox"))
    for (i  <- 0 until 3000) {
      myTestActor.tell("guestName", null)
    }
    Thread.sleep(100)
    system.shutdown()
    system.awaitTermination()
    DeadLetterMetricsActor.deadLetterCount.get must be greaterThan(0)
  }
}

class MyTestActor extends Actor {
  def receive = {
    case message: String =>
      Thread.sleep(500)
      println("message = " + message);
    case _ => println("Unknown Message")
  }
}

object DeadLetterMetricsActor {
  val deadLetterCount = new AtomicInteger
}

class DeadLetterMetricsActor extends Actor {
  def receive = {
    case _: DeadLetter => DeadLetterMetricsActor.deadLetterCount.incrementAndGet()
  }
}

关于Java、Akka Actor 和有界邮箱,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29930682/

相关文章:

java - 使用工业级加密保护 Java 免受逆向工程

json - 使用 json4s 解析 JSON 中的空值

scala - 算法混合

scala - 我如何混合打字和非打字 Actor ?

java - 如何在Actor的onReceive方法中执行异步数据库插入?

Java不可变字符串混淆

Java 属性和引用——我没有得到我所期望的

akka - 如何使用 SubFlows 对已排序流的项目进行分组?

java - 如何有效地小写集合的每个元素?

scala - 类型参数子句中的广义约束?