我在 application.conf
中有以下配置:
bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 100
mailbox-push-timeout-time = 3s
}
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = INFO
daemonic = on
}
这是我配置 Actor 的方式
public class MyTestActor extends UntypedActor implements RequiresMessageQueue<BoundedMessageQueueSemantics>{
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String){
Thread.sleep(500);
System.out.println("message = " + message);
}
else {
System.out.println("Unknown Message " );
}
}
}
现在这就是我启动这个 actor 的方式:
myTestActor = myActorSystem.actorOf(Props.create(MyTestActor.class).withMailbox("bounded-mailbox"), "simple-actor");
之后,在我的代码中,我向该 Actor 发送了 3000 条消息。
for (int i =0;i<3000;i++){
myTestActor.tell(guestName, null);}
我希望看到的异常是我的队列已满,但我的消息每隔半秒就会在 onReceive 方法中打印一次,就像什么都没发生一样。所以我相信我的邮箱配置没有应用。
我做错了什么?
更新:我创建了订阅死信事件的 Actor :
deadLetterActor = myActorSystem.actorOf(Props.create(DeadLetterMonitor.class),"deadLetter-monitor");
并安装了 Kamon 用于队列监控:
在向 Actor 发送了 3000 条消息后,Kamin 向我展示了以下内容:
Actor: user/simple-actor
MailBox size:
Min: 100
Avg.: 100.0
Max: 101
Actor: system/deadLetterListener
MailBox size:
Min: 0
Avg.: 0.0
Max: 0
Actor: system/deadLetter-monitor
MailBox size:
Min: 0
Avg.: 0.0
Max: 0
最佳答案
默认情况下,Akka 会将溢出的消息丢弃到 DeadLetters 中,并且 actor 不会停止处理: https://github.com/akka/akka/blob/876b8045a1fdb9cdd880eeab8b1611aa976576f6/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala#L411
但是在丢弃消息之前,发送线程会在 mailbox-push-timeout-time
配置的时间间隔内被阻塞。尝试将它减少到 1 毫秒,看看下面的测试是否会通过:
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory._
import org.specs2.mutable.Specification
class BoundedActorSpec extends Specification {
args(sequential = true)
def config: Config = load(parseString(
"""
bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 100
mailbox-push-timeout-time = 1ms
}
"""))
val system = ActorSystem("system", config)
"some messages should go to dead letters" in {
system.eventStream.subscribe(system.actorOf(Props(classOf[DeadLetterMetricsActor])), classOf[DeadLetter])
val myTestActor = system.actorOf(Props(classOf[MyTestActor]).withMailbox("bounded-mailbox"))
for (i <- 0 until 3000) {
myTestActor.tell("guestName", null)
}
Thread.sleep(100)
system.shutdown()
system.awaitTermination()
DeadLetterMetricsActor.deadLetterCount.get must be greaterThan(0)
}
}
class MyTestActor extends Actor {
def receive = {
case message: String =>
Thread.sleep(500)
println("message = " + message);
case _ => println("Unknown Message")
}
}
object DeadLetterMetricsActor {
val deadLetterCount = new AtomicInteger
}
class DeadLetterMetricsActor extends Actor {
def receive = {
case _: DeadLetter => DeadLetterMetricsActor.deadLetterCount.incrementAndGet()
}
}
关于Java、Akka Actor 和有界邮箱,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29930682/