java - 无法从开始获得STORM NEW VERSION(1.0.1)中的消息

标签 java hadoop hbase apache-kafka apache-storm

我们正尝试从使用 STORM版本0.9.3 开始在kafka中获取消息,并能够将其放入HBASE TABLE中。

为此,我们使用的配置是:

kafkaConfig.forceFromStart = true;

因此,我们从 OFFSET 0 获得消息,即从hbase表中开始,即完整消息。

但是,当我们尝试使用 STORM VERSION 1.0.1 从kafka开头获取消息并将其放入HBASE TABLE中时,我们仅获得了最后一条消息。我们没有从一开始就获得 OFFSET 0 的消息(即,最后添加的消息能够从一开始就获得它)。

我们使用的配置:
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
kafkaConfig.ignoreZkOffsets = false;
kafkaConfig.maxOffsetBehind = Long.MAX_VALUE;
kafkaConfig.startOffsetTime = -2;

任何帮助表示赞赏。

最佳答案

如果要强制使用者使用指定的偏移量而不是从Zookeeper读取它,则必须将ignoreZkOffsets设置为true。

storm-kafka的文档中:

This means that when a topology has run once the setting KafkaConfig.startOffsetTime will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in ZooKeeper to determine from where it should begin (more precisely: resume) reading. If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime as described above.



因此,要使用队列开头的消息,请使用以下配置:
kafkaConfig.ignoreZkOffsets = true;
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

关于java - 无法从开始获得STORM NEW VERSION(1.0.1)中的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37836631/

相关文章:

java - 通过 XSTREAM 处理 XML 中不需要的元素

memory-management - ColdFusion JVM 无缘无故地使用内存

hadoop - Hadoop 平台中除 mahout 之外的任何其他机器学习库

xml - Hive XPath UDF 与命名空间一起不起作用

java - HBaseProtos$SnapshotDescription 无法解析

java - 从 startActivityForResult 获取结果

java - 绑定(bind)java库Xamarin.Android

hadoop - Hbase 中列和列族的设计

hadoop - 无法使用 Mapreduce 将数据加载到 Htable

sql - 等效于 HBase 中的 Order By 操作