jdbc - Kafka Connect JDBC OOM - 大数据量

标签 jdbc apache-kafka apache-kafka-connect confluent-platform

我正在尝试实现类似的东西 tutorial 。然而,它之所以有效,是因为数据集非常小。对于更大的 table 我该如何做?因为我一直收到内存不足错误。我的日志是

ka.connect.runtime.rest.RestServer:60)
[2018-04-04 17:16:17,937] INFO [Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator ip-172-31-14-140.ec2.internal:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[2018-04-04 17:16:17,938] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:218)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,939] ERROR Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | connect-sink-redshift': (org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread:51)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,940] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
[2018-04-04 17:16:17,940] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2018-04-04 17:16:17,940] ERROR WorkerSinkTask{id=sink-redshift-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,940] ERROR WorkerSinkTask{id=sink-redshift-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-04-04 17:16:17,940] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:96)
[2018-04-04 17:16:17,941] INFO WorkerSourceTask{id=production-db-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-04-04 17:16:17,940] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-statuses,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,946] INFO WorkerSourceTask{id=production-db-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
[2018-04-04 17:16:17,954] ERROR WorkerSourceTask{id=production-db-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,960] ERROR WorkerSourceTask{id=production-db-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-04-04 17:16:17,960] INFO [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:341)
[2018-04-04 17:16:17,960] INFO Stopped ServerConnector@64f4bfe4{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2018-04-04 17:16:17,967] INFO Stopped o.e.j.s.ServletContextHandler@2f06a90b{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)

我还尝试根据建议 here 增加内存但我无法将整个表加载到内存中。有没有办法限制产生的数据数量?

最佳答案

对于 JDBC 连接器,您可以应用的最重要的属性可能是这个,这似乎就是您所要求的。

batch.max.rows

Maximum number of rows to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.

无需“将整个表缓冲到内存中”,通过较小的批处理以及更频繁的轮询和提交,您可以确保几乎所有行都将被扫描,并且不会面临大的风险批处理失败,然后连接器停止一段时间,然后重新启动并在下一次轮询中丢失几行。

否则,请确保您没有使用批量表模式,因为它会尝试一次又一次地扫描整个表。

此外,query选项可以在表格上进行列投影。

您可以找到更多配置选项in the documentation ,但是任何 OOM 错误都需要根据具体情况进行仔细检查,方法是启用 JMX 监控并将这些值导出到某个聚合系统中,您可以像 Prometheus 一样更密切地监控,而不是仅仅看到 OOM 错误而不知道是否发生更改任何特定参数都确实有帮助。


另一种选择是使用基于 CDC 的连接器 like another blog post shows

关于jdbc - Kafka Connect JDBC OOM - 大数据量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49657363/

相关文章:

java - 将 Spring Security 3.1 与现有应用程序集成

postgresql - 如何使用 flyway 将文件上传到 PostgreSQL 数据库?

apache-kafka - Kafka Connect - 无法提交偏移量和刷新

twitter - 如何将 Twitter Heron 与 Storm Flux 结合使用

java - Kafka 依赖项 - ccs 与 ce

java - JDBC 和 MySql 安装配置

java - 如何从 SQLException 对象获取 SQLException 原因

windows - Zookeeper & Kafka 错误 KeeperErrorCode=NodeExists

apache-kafka - 解释 Kafka 中的复制偏移检查点和恢复点偏移

javascript - 如何退出 Node.js 中的函数