java - Akka Streams onFailuresWithBackoff 未重新启动流程

标签 java akka akka-stream akka-http

如果在阶段期间发生任何故障,我尝试在 Akka Streams javadsl 中使用 RestartFlow 来重新启动我的流程阶段之一,但它似乎并没有重新启动流程,而只是删除消息。

我已经看过这个:RestartFlow in Akka Streams not working as expected ,但我使用的是 2.5.19 版本,所以应该修复它?

我尝试了 RestartFlow.onFailuresWithBackoffRestartFlow.withBackoff 但都不起作用。我也尝试过使用整个 Actor 系统主管策略,但这似乎只是拦截异常,以便它不会从流程中抛出,并且 plus 似乎没有提供我想要的退避和最大重试策略。

流:

public Consumer.DrainingControl<Done> stream() {
    return Consumer.committableSource(consumerSettings,
        Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +
            ConfigKeys.CONSUMER_TOPIC)))
        .via(RestartFlow.onFailuresWithBackoff(
                Duration.ofSeconds(1), // min backoff
                Duration.ofSeconds(2), // max backoff,
                0.2, // adds 20% "noise" to vary the intervals slightly
                10, // limits the amount of restarts to 10
                this::dispatchMessageFlow))
        .via(Committer.flow(CommitterSettings.create(system)))
        .toMat(Sink.ignore(), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(mat);
}

然后是流程:

private Flow<ConsumerMessage.CommittableMessage<String, String>,
    ConsumerMessage.Committable, NotUsed> dispatchMessageFlow() {
    return Flow.<ConsumerMessage.CommittableMessage<String, String>>create()
            .mapAsyncUnordered(
                config.getInt(ConfigKeys.PARALLELISM),
                msg ->
                    streamProcessor.process(msg.record().value())
                        .whenComplete((done, e) -> {
                            if (e != null) {
                                throw new RuntimeException(e);
                            } else {
                                if (done.status().isSuccess()){
                                    streamingConsumerLogger.info("Successfully posted message, got response:\n{}",
                                        done.toString());
                                } else {
                                    throw new RuntimeException("HTTP Error!");
                                }
                            }
                        })
                        .thenApply(done -> msg.committableOffset()));
}

我看到了一次异常,akka 指出它将因失败而重新启动图表,但此后就没有其他了。根据我的理解,我应该再看10次。消费者继续收听新消息,因此看起来消息刚刚被丢弃。

java.util.concurrent.CompletionException: java.lang.RuntimeException: HTTP Error!
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:769)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: HTTP Error!
    at com.company.app.messageforwarder.StreamingConsumerService.lambda$null$0(StreamingConsumerService.java:72)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    ... 6 more

如果有人可以帮助我指出正确的方向,我将不胜感激。

最佳答案

它的工作方式有点不同。长话短说 - 如果发生错误,消息将被删除,但源/流将重新启动,而不会杀死整个流。 RestartFlow.onFailuresWithBackoff documentation 中对此进行了描述。 :

The restart process is inherently lossy, since there is no coordination between cancelling and the sending of messages. A termination signal from either end of the wrapped Flow will cause the other end to be terminated, and any in transit messages will be lost. During backoff, this Flow will backpressure.

关于java - Akka Streams onFailuresWithBackoff 未重新启动流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54245791/

相关文章:

java - 试图做一个贷款计算器,找不到符号

java - 使用 Maven

scala - 如何真实测试akka流?

scala - Akka Streams 中的 RestartFlow 未按预期工作

java - 如何使用 java/scala 将图像数据插入 mySql mediumtext 字段?

java - 如何从 jar 文件中加载图像?

scala - 如何处理 Actor 的爆发?

java - 集群中 Akka 成员(actor)查找

scala - PlayWS 发布多部分表单数据

scala - Akka HTTP Websocket,如何识别actor内部的连接