ISpout.nextTuple()
javadoc指定在同一线程上调用 nextTuple()
、ack(...)
和 fail(...)
。
但是,调用 emit(...)
的实际收集器是之前提供的,作为 open(..., Collector)
上的参数。
问题是看到一些新数据的后台线程是否必须始终将数据排队以便 nextTuple() 出队并发出。如果后台线程立即发出数据会发生什么?支持吗?如果允许的话,在 nextTuple()
中实现“短时间 sleep ”的推荐方法是什么?
最佳答案
nextTuple()/ack()/fail()
的隐含含义在同一线程上调用的方法是在机器“A”上运行的任务(后台 Java 线程),它发出元组是在“A”上运行的相同任务,在该任务上调用 ack()/fail()取决于处理(由运行在“B”或“C”的 Bolt 处理)拓扑中元组的成功/失败。
只要messageId不为null并且Bolt任务在execute()方法中调用ack(tuple),Storm框架就会跟踪拓扑中的元组遍历并调用元组的ack()/fail()拥有任务。
在回答您的问题之前,先简单介绍一下后台任务线程的工作原理。后台任务线程具有用于发出的元组的内存中结构/缓冲区,以及用于状态/待处理元组等的少数其他内存中结构。当 Spout/Bolt 开始发出数据时,缓冲区会被填满,并且该缓冲区会被释放为当元组被处理时,即调用 ack()/fail() 之后。本质上,后台线程调用 nextTuple()
当缓冲区空闲并且后台线程停止调用nextTuple()
时一旦缓冲区已满。简单来说,emit()方法要么在 open()/nextTuple()/close()
,填充后台线程缓冲区和 ack()/fail()
释放缓冲区。
根据上述解释,后台线程不知道新的/传入的数据。由 nextTuple() 中的逻辑从源(Twitter/JMS 提供程序/ESB/AMQP 兼容服务器/RDBMS)读取数据并发出数据。因此,根据后台线程的缓冲区大小,Storm 会调用 nextTuple(),如上所述。
对于其他问题,如果需要的话,短时间 sleep 应该可以。请注意,nextTuple()
不需要发出值,它可以什么都不返回。
关于multithreading - Storm Spouts 是否应该仅使用调用 Spout.nextTuple 的线程发出输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21689225/