python - 如何将 python kafka 客户端与 gevent 一起使用 - 是否有一个真正有效的库?

标签 python python-2.7 gevent apache-kafka

我正在尝试使用 gevent 在 python 2.7 上使用 brod 写入 kafka 0.7.2。

这是我收到的错误消息。猜测是因为阻塞。 brod 支持tornado,但我使用gevent。

No handlers could be found for logger "brod.socket"
Traceback (most recent call last):
  File "/var/chef/cache/src/gevent/gevent/greenlet.py", line 328, in run
    result = self._run(*self.args, **self.kwargs)
  File "worker_server.py", line 204, in execute_kafka_pipe
    kafka.produce(topic,payload)
  File "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/base.py", line 287, in produce
    return self._write(request, callback)
  File "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/blocking.py", line 98, in _write
    return self._write(data, callback, retries)
  File "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/blocking.py", line 89, in _write
    wrote_length += self._socket.send(data)
  File "/var/chef/cache/src/gevent/gevent/socket.py", line 441, in send
    self._wait(self._write_event)
  File "/var/chef/cache/src/gevent/gevent/socket.py", line 292, in _wait
    assert watcher.callback is None, 'This socket is already used by another greenlet: %r' % (watcher.callback, )
AssertionError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x1dece60>>
<Greenlet at 0x1d4b9b0: execute_kafka_pipe('topic-spend', '{"enode": 1, "city": "Cairns", "dl": "en", "wnode)> failed with AssertionError

我尝试使用 gevent-kakfa 但依赖于 gevent-zookeeper。

当尝试连接到 Zookeeper 时,我收到此消息:

Traceback (most recent call last):
  File "/home/ubuntu/workspace/rtbhui-devops/servers/worker_server.py", line 68, in <module>
    framework = gevent_zookeeper.ZookeeperFramework('localhost:2181', 10)
  File "/usr/local/lib/python2.7/dist-packages/gevent_zookeeper/framework.py", line 241, in __init__
    self.client = ZookeeperClient(hosts, timeout)
  File "/usr/local/lib/python2.7/dist-packages/gevent_zookeeper/client.py", line 211, in __init__
    self._event = gevent.core.event(
AttributeError: 'module' object has no attribute 'core'
Exception AttributeError: "'ZookeeperClient' object has no attribute '_event'" in <bound method ZookeeperClient.__del__ of <gevent_zookeeper.client.ZookeeperClient object at 0x274ded0>> ignored

是否没有一个可以使用 gevent 编写消息的 python 库?

最佳答案

您可以尝试https://github.com/hmahmood/kafka-python/tree/gevent-impl

目前已提交至主线 kafka-python 库进行 PR:

https://github.com/mumrah/kafka-python/pull/145

关于python - 如何将 python kafka 客户端与 gevent 一起使用 - 是否有一个真正有效的库?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19642769/

相关文章:

python - 我的嵌套 For 循环运行 "faster"比内联 For 循环

python - pyxb 的十进制数据类型丢失零

python - 为什么 is_integer() 方法不起作用?

python - 如何并行并发 HTTP 请求

python - Gevent Pool 似乎并没有提高性能

python - 查找循环网络中的边x python

python - 编写一个与 wpa_supplicant 交互的外部程序

Python 函数返回 None。为什么?

javascript - 一次在odoo中应用多个过滤器

Python mysqldb db.commit() 不工作