java - MongoSink 响应后提交给 kafka 消费者 - alpakka mongo 连接器

标签 java mongodb akka akka-stream alpakka

我正在使用 alpakka 连接器来使用来自 Kafka 的数据包并将它们插入到 Mongo db 中。我试图在收到 Mongo db 的响应后提交偏移量,但找不到任何相关内容。如何确保数据包成功插入 Mongodb 后才会提交偏移量?

我使用 Consumer.CommittableSource 作为源,MongoSink 作为接收器,并使用 RunnableGraph 运行流。请参阅代码以获取更多说明。

来源:

    public Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> source() {
        return Consumer.committableSource(consumerSettings, subscription);
    }

流程:

   public Flow<ConsumerMessage.CommittableMessage, String, NotUsed> transformation() {
        return Flow.of(ConsumerMessage.CommittableMessage.class).map(i -> i.record().value().toString());
    }

水槽:

    public Sink<String, CompletionStage<Done>> sink() {
        return MongoSink.insertOne(mongoCollection);
    }

图表:

        RunnableGraph graph = RunnableGraph.fromGraph(GraphDSL.create(sink(), (builder, s) -> {
            builder.from(builder.add(source()).out()).via(builder.add(transformation())).to(s);
            return ClosedShape.getInstance();
        }));
        graph.run(ActorMaterializer.create(actorSystem));

编辑:

使用 PassThroughFlow,插入 Mongo 正在工作,并且没有给出任何异常或错误,但仍然无法提交数据包。 conversionCommit() 函数从未被调用过。

更新流程:

    public Flow<String, String, NotUsed> transformationMongo() {
        LOGGER.info("Insert into Mongo");
        return  MongoFlow.insertOne(connection.getDbConnection());
    }
public Flow<ConsumerMessage.CommittableMessage, ConsumerMessage.CommittableOffset, NotUsed> transformationCommit() {
        return Flow.of(ConsumerMessage.CommittableMessage.class).map(i -> i.committableOffset());
    }

水槽:

    public Sink<ConsumerMessage.CommittableOffset, CompletionStage<Done>> sinkCommit() {
        CommitterSettings committerSettings = CommitterSettings.create(actorSystem);
        return Committer.sink(committerSettings);
    }

PassThroughFlow:

public class PassThroughFlow {
    public Graph<FlowShape<ConsumerMessage.CommittableMessage, ConsumerMessage.CommittableMessage>, NotUsed> apply(Flow<ConsumerMessage.CommittableMessage, String, NotUsed> flow) {
        return Flow.fromGraph(GraphDSL.create(builder -> {
            UniformFanOutShape broadcast = builder.add(Broadcast.create(2));
            FanInShape2 zip = builder.add(ZipWith.create((left, right) -> Keep.right()));
            builder.from(broadcast.out(0)).via(builder.add(flow)).toInlet(zip.in0());
            builder.from(broadcast.out(1)).toInlet(zip.in1());
            return FlowShape.apply(broadcast.in(), zip.out());
        }));
    }
}

图表:


RunnableGraph graph = RunnableGraph.fromGraph(GraphDSL.create(sinkCommit(), (builder, s) -> {
            builder.from(builder.add(source()).out()).via(builder.add(passThroughFlow.apply(transformation().via(transformationMongo())))).via(builder.add(transformationCommit())).to(sales);
            return ClosedShape.getInstance();
        }));
        graph.run(ActorMaterializer.create(actorSystem));

//Insertion to mongo is working but still the packet not committed. transformationCommit() function has never been called.

最佳答案

使用 MongoFlow 而不是 MongoSink这使您可以在之后继续流,在那里您将能够提交偏移量。

关于java - MongoSink 响应后提交给 kafka 消费者 - alpakka mongo 连接器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55950058/

相关文章:

java - 是否可以在List中放入两个变量类型?

java - 如何在ANTLR中找到children上下文的children?

java - 处理 Kafka Broker 宕机时的故障

mongodb - 如何查询Breeze.js knockout.js mongoHQ curl

java - ViewPart 中的 onShow 和 onHide 监听器 - Java Eclipse RCP

MongoDB 建议请求

javascript - SQL Server 实时推送通知到 Node.JS

java - 如何知道 Actor 完成了执行?

Akka 2.2 ClusterSingletonManager 发送 HandOverToMe 到 [None]

java - 如何在 Java 中实现 Function1(其 compose 和 andThen 方法)?