我无法将 Streamz 流转换为使用 Kafka 源生成的 Dask Stream。PFB 代码
from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
{'bootstrap.servers': 'kafkaXX:9092',
'group.id': 'streamz'})
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()
我收到此错误消息
ValueError: Two different event loops active
最佳答案
如果没有另外指示,kafka 源将在线程中启动自己的事件循环。对 Client()
的调用也可以执行此操作。要将循环从一个传递到另一个,您可以这样做
Stream.from_kafka(..., loop=client.loop)
请注意,对 .scatter()
的调用还需要显式访问事件循环,但由于这是特定于 dask 的,因此它知道使用您拥有的任何事件客户端的循环。
关于apache-kafka - 无法将 Streamz Kafka Stream 转换为 Dask Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57037352/