我正在尝试在多个 spout 之间分担任务。我有一种情况,我一次从外部源获取一个元组/消息,并且我想要一个 spout 的多个实例,其背后的主要目的是分担负载并提高性能效率。
我可以对一个 Spout 本身执行相同的操作,但我想在多个 Spout 之间分担负载。我无法获得分散负载的逻辑。由于在特定的 spout 完成消费该部分之前(即基于缓冲区大小集),消息的偏移量是未知的。
任何人都可以对如何解决逻辑/算法提出一些亮点吗?
预先感谢您的宝贵时间。
更新响应答案:
现在在 Kafka 上使用多分区(即
5
)以下是使用的代码:
builder.setSpout("spout", new KafkaSpout(cfg), 5);
通过在每个分区上注入(inject) 800 MB
数据进行测试,完成读取需要 ~22 秒
。
再次使用 parallelism_hint = 1 的代码
即 builder.setSpout("spout", new KafkaSpout(cfg), 1);
现在需要更多 ~23 秒
!为什么?
根据 Storm Docs setSpout() 声明如下:
public SpoutDeclarer setSpout(java.lang.String id,
IRichSpout spout,
java.lang.Number parallelism_hint)
在哪里,
parallelism_hint - 是应该分配给执行这个 spout 的任务数。每个任务将在集群某处的进程中的线程上运行。
最佳答案
我在 storm-user 中遇到过讨论讨论类似的事情。
阅读Relationship between Spout parallelism and number of kafka partitions .
使用kafka-spout做storm需要注意的2点
- 您可以在 KafkaSpout 上拥有的最大并行度是分区数。
- 我们可以将负载拆分为多个 kafka 主题,并为每个主题单独的 spout 实例。 IE。 每个 spout 处理一个单独的主题。
所以如果我们有这样的情况,其中每个主机的 kafka 分区配置为 1,主机数量为 2。即使我们将 spout parallelism 设置为 10,所考虑的最大值也只会是 2,这是数量分区数。
如何在 Kafka-spout 中提及分区数?
List<HostPort> hosts = new ArrayList<HostPort>();
hosts.add(new HostPort("localhost",9092));
SpoutConfig objConfig=new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 4), "spoutCaliber", "/kafkastorm", "discovery");
如您所见,这里可以使用 hosts.add
添加代理,并且在 new KafkaConfig.StaticHosts(hosts) 中将分区号指定为 4 , 4)
代码片段。
如何在Kafka-spout中提及并行提示?
builder.setSpout("spout", spout,4);
您可以在使用 setSpout
方法将 spout 添加到拓扑中时提到相同的内容。这里的4 是并行提示。
更多可能有帮助的链接
Understanding-the-parallelism-of-a-Storm-topology
what-is-the-task-in-twitter-storm-parallelism
免责声明: !!我是 storm 和 java 的新手!!!!因此,如果需要某些地方,请编辑/添加。
关于java - Storm-Kafka多个spout,如何分担负载?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18267834/