postgresql - 重新创建数据库容器时,Debezium 源任务无法重新连接到 postgresql 数据库

标签 postgresql apache-kafka kubernetes debezium

我们有一个 kubernetes 集群,其中 Debezium 作为 Postgresql 的源任务运行并写入 kafka。 Debezium、postgres 和 kafka 都在单独的 pod 中运行。 当 postgres pod 被删除并且 kubernetes 重新创建 pod 时,debezium pod 无法重新连接。 来自 Debezium pod 的日志:

    2018-07-17 08:31:38,311 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
    2018-07-17 08:31:38,311 INFO   ||  [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms.   [org.apache.kafka.clients.producer.KafkaProducer]

Debezium 继续尝试定期刷新未完成的消息,但出现以下异常:

    2018-07-17 08:32:38,167 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Exception thrown while calling task.commit()   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy
    at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:151)
    at io.debezium.connector.postgresql.PostgresConnectorTask.commit(PostgresConnectorTask.java:138)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:437)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:378)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:108)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:45)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:82)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy
    at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:942)
    at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:23)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:176)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java:99)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.doFlushLsn(PostgresReplicationConnection.java:246)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.flushLsn(PostgresReplicationConnection.java:239)
    at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:146)
    ... 13 more
    Caused by: java.net.SocketException: Broken pipe (Write failed)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at org.postgresql.core.PGStream.flush(PGStream.java:553)
    at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:939)
    ... 19 more

有没有办法让 Debezium 在可用时重新建立与 postgres 的连接? 或者我缺少一些配置?

  • Debezium 版本 0.8
  • kubernetes 版本 1.10.3
  • postgres 版本 9.6

最佳答案

看起来这是一个常见问题,并且在 Debezium 和 kafka 中都有开放的功能请求

https://issues.jboss.org/browse/DBZ-248

https://issues.apache.org/jira/browse/KAFKA-5352

虽然这些是开放的,但看起来这是预期的行为

作为解决方法,我已将此 active 探测器添加到部署中

    livenessProbe:
        exec:
          command:
          - sh
          - -ec
          - ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1  -d'/'); reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l); if [ $reply -lt 2 ]; then exit 1; fi;
        initialDelaySeconds: 30
        periodSeconds: 5

第一个子句获取容器IP地址:

    ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/');

第二个子句发出请求并计算响应 json 中“RUNNING”的实例数:

    reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l);

如果“RUNNING”出现少于两次,第三个子句将返回退出代码 1

    if [ $reply -lt 2 ]; then exit 1; fi

它似乎正在进行初始测试 - 即重新启动 postgres DB 会触发 debezium 容器的重新启动。我猜想图像中可以包含类似这样的脚本(尽管可能是“稳健的”)以方便探测。

关于postgresql - 重新创建数据库容器时,Debezium 源任务无法重新连接到 postgresql 数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51377720/

相关文章:

docker - 如何在Kubernetes中分离应用程序和数据同步实现?

mysql - 我的应用程序访问远程数据库。如何有效地运行单元测试?

ruby-on-rails - 提交文本时 Rails 4 数字验证不抛出错误

apache-kafka - Kafka 到 BigQuery,消费消息的最佳方式

apache-kafka - Kafka 最优保留和删除策略

apache-kafka - 无法从客户端 session ID 读取其他数据

python - PyGtk 中命令 "python setup.py egg_info"失败,错误代码为 1

sql - 如何使用sql查询获取输出列

php - postgresql 位串与否?

kubernetes - 如何安装带有Helm的Prometheus,以便可以从浏览器中使用它?