sql-server - Debezium 服务器到 PubSub : delete-operations breaks the application

标签 sql-server postgresql google-cloud-platform publish-subscribe debezium

我想将有关源数据库(postgres 和 sql-server)中的更改的通知发送到 pubsub,以便稍后将数据存储在 bigquery 中。到目前为止,一切都很好。我决定看一下 debezium 服务器,并出于测试目的将 debezium 服务器部署为 Docker 容器,并创建了一个 sql 服务器数据库和一个 postgres 数据库。

经过一些调试后,我在 pubsub 中收到了第一个关于创建和更新的通知。到目前为止一切顺利。

当我尝试删除数据库中的记录时,服务器崩溃而没有向 pubsub 发送消息。在日志中我可以看到类似的内容:

{"timestamp":"2023-03-30T17:29:13.722Z","sequence":230,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.EmbeddedEngine","level":"INFO","message":"Stopping the task and engine","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:13.723Z","sequence":231,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"Stopping down connector","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.178Z","sequence":232,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-15-thread-1","threadId":59,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.181Z","sequence":233,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-16-thread-1","threadId":60,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.182Z","sequence":234,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Finished streaming","threadName":"debezium-postgresconnector-streamio23-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.183Z","sequence":235,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Connected metrics set to 'false'","threadName":"debezium-postgresconnector-streamio23-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.199Z","sequence":236,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.storage.FileOffsetBackingStore","level":"INFO","message":"Stopped FileOffsetBackingStore","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.201Z","sequence":237,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"ERROR","message":"Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.', error = 'io.debezium.DebeziumException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.'","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1,"exception":{"refId":1,"exceptionType":"io.debezium.DebeziumException","message":"java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.","frames":[{"class":"io.debezium.server.pubsub.PubSubChangeConsumer","method":"handleBatch","line":257},{"class":"io.debezium.embedded.ConvertingEngineBuilder","method":"lambda$notifying$2","line":101},{"class":"io.debezium.embedded.EmbeddedEngine","method":"run","line":913},{"class":"io.debezium.embedded.ConvertingEngineBuilder$2","method":"run","line":229},{"class":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":170},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}],"causedBy":{"exception":{"refId":2,"exceptionType":"java.util.concurrent.ExecutionException","message":"com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute."

这是我的 application.properties 文件。我猜唯一非基本的事情是主题路由,因为我不想为每个表创建一个 pubsub 主题。

debezium.sink.pravega.scope=empty
debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=XXXXX
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/tmp/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=XXXXXXXX
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.topic.prefix=streamio23
debezium.source.schema.include.list=inventory
debezium.snapshot.new.tables=parallel
debezium.source.plugin.name=pgoutput
debezium.transforms=Reroute
debezium.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
debezium.transforms.Reroute.topic.regex=(.*)inventory(.*)
debezium.transforms.Reroute.topic.replacement=stream.stream.stream.inventory.orders
debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.value.converter.schemas.enable=true
debezium.source.database.history.file.filename=/tmp/FileDatabaseHistory.dat
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
pk.mode=record_key

也许有人知道这个问题并有解决方案? postgres 和 mssql 的错误是相同的,所以我认为它是接收器的问题?

我已经尝试了 application.properties 文件的多种配置并检查了偏移量。但问题总是一样的。

提前谢谢您!

最佳答案

我遇到了同样的问题,并通过设置解决了:

debezium.source.tombstones.on.delete=false

here您可以查看墓碑事件用途的说明。简而言之,它是一个使具有日志压缩功能的 kafka 主题能够实际删除事件的功能。这对于 pubsub 来说是不可取的。

关于sql-server - Debezium 服务器到 PubSub : delete-operations breaks the application,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75891500/

相关文章:

c# - 在 ASP.NET 中从数据库动态创建 DIV

python - pyodbc 不更新表

python - postgresql编程错误: function identifier(unknown) does not exist

sql-server - SQL Server 中 CASE 语句中的 IN 条件

SQL 查询 : How to rearrange the output (Transpose? )

c# - 将 SQL 查询更改为 LINQ、asp.net MVC

google-cloud-platform - Google Container Builder 是否需要 Dockerfile 还是 cloudbuild.yaml 应该涵盖所有用例?

Node.js IPv6 问题

postgresql - 无法删除数据库

python - 存储桶名称必须以数字或字母开头和结尾