elasticsearch - Kafka Elasticsearch 连接器 - 'Flush timeout expired with unflushed records:'

标签 elasticsearch apache-kafka apache-kafka-connect confluent-platform

我对 kafka -> elasticsearch 连接器有一个奇怪的问题。第一次启动时一切都很好,我在 elasticsearch 中收到了一个新数据并通过 kibana 仪表板检查了它,但是当我使用相同的生产者应用程序将新数据生成到 kafka 并尝试再次启动连接器时,我没有无法在 elasticsearch 中获取任何新数据。 现在我收到这样的错误:

[2018-02-04 21:38:04,987] ERROR WorkerSinkTask{id=log-platform-elastic-0} Commit of offsets threw an unexpected exception for sequence number 14: null (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 15805

我正在使用下一个命令来运行连接器:

/usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties log-platform-elastic.properties

connect-avro-standalone.properties:

bootstrap.servers=kafka-0.kafka-hs:9093,kafka-1.kafka-hs:9093,kafka-2.kafka-hs:9093
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
#rest.host.name=
rest.port=8084
#rest.advertised.host.name=
#rest.advertised.port=
plugin.path=/usr/share/java

log-platform-elastic.properties:

name=log-platform-elastic
key.converter=org.apache.kafka.connect.storage.StringConverter
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=member_sync_log, order_history_sync_log # ... and many others
key.ignore=true
connection.url=http://elasticsearch:9200
type.name=log

我检查了与 kafka broker、elasticsearch 和 schema-registry 的连接(此时 schema-registry 和连接器位于同一主机上),一切正常。 Kafka 代理在端口 9093 上运行,我能够使用 kafka-avro-console-consumer 从主题中读取数据。 如果您对此有任何帮助,我将不胜感激!

最佳答案

只需将 flush.timeout.ms 更新为大于 10000(默认为 10 秒)

根据文档:

flush.timeout.ms The timeout in milliseconds to use for periodic flushing, and when waiting for buffer space to be made available by completed requests as records are added. If this timeout is exceeded the task will fail.

Type: long Default: 10000 Importance: low

See documentation

关于elasticsearch - Kafka Elasticsearch 连接器 - 'Flush timeout expired with unflushed records:',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48613433/

相关文章:

java - 如何将流式 json 数据作为键值对发送到 kafka 消费者中

apache-kafka - Kafka Connect 警报选项?

ElasticSearch:按特定顺序排序

python - 如何使用 elasticsearch DSL for python 访问响应对象

elasticsearch - 如何限制Ubuntu 17.10中elasticsearch的内存使用?

apache-kafka - Kafka 流与 Kafka 消费者如何决定使用什么

elasticsearch - elasticsearch架构/开发查询-ADFS/安全过滤/SearchUI

spring-boot - 为Kafka创建的Kubernetes端点,但未反射(reflect)在POD中

mysql - 使用 Debezium 通过 SSL 连接到 Cloud SQL 时出错

java - Kafka Connect S3 接收器在加载 Avro 时抛出 IllegalArgumentException