apache-kafka - 无法将 Streamz Kafka Stream 转换为 Dask Stream

标签 apache-kafka dask dask-distributed

我无法将 Streamz 流转换为使用 Kafka 源生成的 Dask Stream。PFB 代码

from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
       {'bootstrap.servers': 'kafkaXX:9092',
        'group.id': 'streamz'}) 
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()

我收到此错误消息

ValueError: Two different event loops active

最佳答案

如果没有另外指示,kafka 源将在线程中启动自己的事件循环。对 Client() 的调用也可以执行此操作。要将循环从一个传递到另一个,您可以这样做

Stream.from_kafka(..., loop=client.loop)

请注意,对 .scatter() 的调用还需要显式访问事件循环,但由于这是特定于 dask 的,因此它知道使用您拥有的任何事件客户端的循环。

关于apache-kafka - 无法将 Streamz Kafka Stream 转换为 Dask Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57037352/

相关文章:

go - 无法使用来自本地运行的 Kafka 服务器的消息,使用 Golang Sarama 包

python - dask distributed 是否使用 Tornado 协程来处理 worker 任务?

dask - 我应该如何从外部服务连接到部署在 Kubernetes 中的 Dask 网关?

python - Dask 依赖图中的容错

apache-kafka - Kafka 如何处理运行速度比其他消费者慢的消费者?

java - Flink - 查询Kafka主题以获取消费者组的偏移量?

apache-kafka - Apache Kafka 示例错误 : Failed to send message after 3 tries

python - dask:client.persist 和 client.compute 之间的区别

python - 并行化 Dask 聚合

dask - 如何查找 Dask 分布式函数调用的并发.future 输入参数