java-nats-streaming : Publishing Messages after Server Reconnects

标签 java nats-streaming-server

我有一个 NATS 流集群,设置了 3 个节点。看来我的 java 应用程序在服务器停机期间发布的 NATS 消息丢失了(即当我的服务器备份并运行时不会再次重新发布)。

更详细的描述:

  1. NATS 集群上线。发布者和订阅者应用程序上线。发布者开始每秒发布一条消息。订阅者接收消息。
  2. NATS 服务器已关闭。发布者继续发布消息(我们将这些消息称为“离线消息”)。订阅者停止接收任何内容
  3. NATS 服务器恢复在线。订阅者再次开始接收消息,但永远不会收到“离线消息”。

我的发布者和订阅者应用程序都配置为尝试重新连接到 NATS 服务器并且不会超时。我在整个过程中没有遇到任何异常。

NATS 连接:

Options options = new Options.Builder().servers(serverList).maxReconnects(-1).build();

Connection nc = Nats.connect(options);

StreamingConnectionFactory cf = new StreamingConnectionFactory(natsProperties.getClusterId(), natsProperties.getClientId());
cf.setNatsConnection(nc);
streamingConnection = cf.createConnection();

发布者:

// subject and message String variables are passed in
streamingConnection.publish(subject, message.getBytes());

订阅者:

streamingConnection.subscribe(subject, new MessageHandler() {
    public void onMessage(Message m) {
        System.out.prinf("Received msg: %s\n", m.getData())
    }
},  new SubscriptionOptions.Builder().durableName(durableName).build());

从文档来看,Java NATS 客户端似乎内置了一个重新连接缓冲区。我尝试将缓冲区增加 10 倍,但无济于事(而且,我的消息仅包含 2 位数字)。如何让它重新发送这些“离线消息”?

最佳答案

我也有同样的问题,我看到的唯一解决方案是另一种订阅方法被占用,保存消息序列,但这我认为不是最好的

   // Receive messages starting at a specific sequence number
   sc.subscribe("foo", new MessageHandler() {
   public void onMessage(Message m) {
     logger.info("Sequence message " +  m.getSequence());
     System.out.printf("Received a message: %s\n", m.getData());
   }
   }, new SubscriptionOptions.Builder().startAtSequence(22).build());

关于java-nats-streaming : Publishing Messages after Server Reconnects,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53190596/

相关文章:

java - 如何使用 JPA 在 2 个表之间进行内连接、左连接、右连接

apache-flink - 如何将 NATS 流服务器与 Apache flink 结合使用?

java - NATS 持久消息 Java 客户端

java - 尝试在多个 JPanel 中显示图像

java - java中如何实例化一个对象?

java - 2 个不同订阅的消息顺序

linux - 如何在不停止流媒体服务器的情况下热重载 NATS 流媒体服务器?

go - NATS流 “StartAt”订阅选项

java - 如何停止崩溃的JLabels

java - 结合最新的和冷的观察结果