python - 缓冲区错误 : Local: Queue full in Python

标签 python apache-kafka

import logging
from confluent_kafka import Producer
import os

logger = logging.getLogger("main")

BOOTSTRAP_SERVERS = os.environ['BOOTSTRAP_SERVERS']
APPLICATION_ID = os.getenv('APPLICATION_ID', default = "nke-data-source")
RECONNECT_BACKOFF_MS = os.getenv('RECONNECT_BACKOFF_MS', default = 1000)
REQUEST_TIMEOUT_MS = os.getenv('REQUEST_TIMEOUT_MS', default = 40000)
ACKS = os.getenv('ACKS', default = "all")
RETRIES = os.getenv('RETRIES', default = 15)
RETRY_BACK_OFF = os.getenv('RETRY_BACK_OFF', default = 1000)
MAX_IN_FLIGHT_REQUESTS = os.getenv('MAX_IN_FLIGHT_REQUESTS', default = 1)
topic = os.getenv('OUTBOUND_TOPIC', default = "tti-nke-raw")

p = Producer({'bootstrap.servers': BOOTSTRAP_SERVERS, 
    'client.id': APPLICATION_ID, 
    'reconnect.backoff.ms': RECONNECT_BACKOFF_MS,
    'request.timeout.ms': REQUEST_TIMEOUT_MS,
    'acks': ACKS,
    'retries': RETRIES,
    'retry.backoff.ms': RETRY_BACK_OFF,
    'max.in.flight.requests.per.connection': MAX_IN_FLIGHT_REQUESTS,
    'compression.type': "lz4"})

def send(key, event):
    try:
        logger.info("Sending key: [{0}] value: [{1}]".format(key, event))
        p.produce(topic=topic, value=event.encode('utf-8'), key=key)
    except Exception:
        logger.error("error sending events to kafka", exc_info=True)


错误:-
Traceback (most recent call last):
BufferError: Local: Queue full
File "/app/sender.py", line 30, in send
p.produce(topic=topic, value=event.encode('utf-8'), key=key)


任何人都可以帮助我,因为我是 python 新手

最佳答案

这个Queuelibrdkafka 中实现了一些东西库(confluent_kafka 绑定(bind)到的)

有一个内部Queue对于获取生产者交货报告并等待产品处理它们的产品(大多数情况下什么都不做),但是您需要触发这种通过队列的机制,可以通过调用 poll 来简单地调用它

您应该调用 producer.poll(0)每次生产后
所以改变:

p.produce(topic=topic, value=event.encode('utf-8'), key=key)

进入:

p.produce(topic=topic, value=event.encode('utf-8'), key=key)
p.poll(0)

这将触发队列清理,不要担心性能,因为这是一个非常简单的功能,并没有真正做太多 librdkafka 的作者写道:

poll() is cheap to call, it will not have a performance impact, so please add it to your producer loop.



基本上它是做什么的:

call poll() at regular intervals to serve the producer's delivery report callbacks.



考虑阅读此 Issue

关于python - 缓冲区错误 : Local: Queue full in Python,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62408128/

相关文章:

python - Jupyter 笔记本无法使用 python 3.7 打开 anaconda 3

python - 将 Flask 应用程序导入 Elastic Beanstalk 错误 : can't open file 'mod_wsgi' : [Errno 2] No such file or directory

python - 如何根据 pyodbc 查询的结果创建 NumPy 数组?

python - 在 pandas 的 groupby 语句中将两列相乘

python - 你能在 LibreOffice 中用 Python 录制宏吗?

apache-kafka - 我们是否可以选择从特定时间段/时间戳获取 KSQL 流中的数据

apache-kafka - 仅启用一次时,Kafka 流中的 UnknownProducerIdException

apache-spark - Pyspark 找不到数据源 : kafka

java - 配置 JMX 导出器时 Kafka 无法启动

.net - 无法使用 azure Eventhub 在架构注册表中创建架构