mongodb - 使用 elasticsearch sink 连接器删除和编辑操作不起作用

标签 mongodb elasticsearch apache-kafka apache-kafka-connect

我正在尝试使用 Kafka 在 MongoDB 和 Elasticsearch 之间构建一个简单的管道。插入的数据已成功存储在 Elasticsearch 中,但是当我编辑或删除一个文档时,我只是将另一个文档存储在 Elasticsearch 中。
这是我的 MongoDB 源连接器

{ "name": "mongo-source", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
     "mongodb.hosts": "mongo1:27017", 
         "mongodb.name": "fullfillment" } }

es 接收器连接器
{ "name": "sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "connection.url": "http://172.21.0.3:9200", "type.name": "subscriber", "topics": "fullfillment.elasticsearchApp.subscriber","schema.ignore": "true","behavior.on.null.values": "delete","key.ignore": "true"    } }

这是我在我的卡夫卡主题中获得的记录样本
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"ns"},{"type":"int32","optional":false,"field":"sec"},{"type":"int32","optional":false,"field":"ord"},{"type":"int64","optional":true,"field":"h"},{"type":"boolean","optional":true,"default":false,"field":"initsync"}],"optional":false,"name":"io.debezium.connector.mongo.Source","version":1,"field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"fullfillment.elasticsearchApp.subscriber.Envelope"},"payload":{"after":"{\"_id\" : {\"$oid\" : \"5ea824df92f03c3c92966b04\"},\"name\" : \"adem\"}","patch":null,"source":{"version":"0.7.5","name":"fullfillment","rs":"rs0","ns":"elasticsearchApp.subscriber","sec":1588077791,"ord":1,"h":0,"initsync":false},"op":"c","ts_ms":1588077791540}}
services:

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:6.8.6
    hostname: elasticsearch
    container_name: elasticsearch
    ports:
      - 9200:9200
    networks:
      - es-network  

  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - 2181:2181
    networks:
      - es-network  
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000


  kafka:
    image: confluentinc/cp-kafka:5.0.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
      - 9092:9092
    networks:
      - es-network
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    hostname: schema-registry
    container_name: schema-registry
    networks:
      - es-network
    ports:
      - 8081:8081
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
    depends_on:
      - zookeeper
      - kafka

  kafka-connect:
    image: confluentinc/cp-kafka-connect
    hostname: kafka-connect
    container_name: kafka-connect
    networks:
      - es-network
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      #INTERNAL_KEY_CONVERTER: 'false'
      #INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: 'true' 
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR:  1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR:  1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR:  1
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - /home/xroot/Desktop/docker_env/confluent_env/confluent_on_docker/jars/debez:/etc/kafka-connect/jars
    depends_on:
      - zookeeper
      - kafka
      - schema-registry

  #control-center:
  #  image: confluentinc/cp-enterprise-control-center:5.4.0
  #  hostname: control-center
  #  container_name: control-center
  #  depends_on:
  #    - zookeeper
  #    - kafka
  #   - schema-registry
  #  ports:
  #    - 9021:9021
  #  environment:
  #    CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka:9092
  #    CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181
  #    CONTROL_CENTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
  #    CONTROL_CENTER_REPLICATION_FACTOR: 1
  #    CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
  #    CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
 #     CONFLUENT_METRICS_TOPIC_REPLICATION: 1
  #    PORT: 9021


  # MongoDB Replica Set
  mongo1:
    hostname: mongo1
    image: mongo
    container_name: mongo1
    networks:
      - es-network
    ports:
      - 27018:27017
    restart: always
    entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]

  mongo2:
    hostname: mongo2
    image: mongo
    container_name: mongo2
    networks:
      - es-network
    ports:
      - 27017:27017
    restart: always
    entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]

networks:
  es-network:
    attachable: true

最佳答案

您已设置 "key.ignore": "true"其中per the docs表示连接器将使用消息的主题+分区+偏移量作为 Elasticsearch 文档 ID。由于 Kafka 中的每条更新和删除消息都是一条新消息,因此您每次都会获得一个新的 Elasticsearch 文档。

设置 "key.ignore": "true"在您的接收器连接器中,并确保您的 Kafka 消息 key 唯一标识您要在 Elasticsearch 中更新/编辑的文档。

要使用 MongoDB 源专门处理此问题,您需要从 STRUCT 中提取源 ID。通过将其添加到您的源连接器中的 key :

"transforms":"extractValuefromStruct",
"transforms.extractValuefromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key"
"transforms.extractValuefromStruct.field":"id"

关于mongodb - 使用 elasticsearch sink 连接器删除和编辑操作不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61480579/

相关文章:

node.js - 在node.js/mongoDB中更新时发送后无法设置 header

mongodb - 将 ISO 日期转换为 yyyy-mm-dd 格式

elasticsearch - Elasticsearch如何在分片运动中选择目标节点?

javascript - 将副本添加到索引模板上的elasticsearch

apache-kafka - 在 Kafka 流加入中,第二个加入字段是否会进入队列以等待第一个字段加入窗口?

java - Kafka 生产者类未找到异常

mongodb - 对象数组中的 Mongodb 不同值组

javascript - MongoDB 有问题的 JSON prop/field

elasticsearch - 日志存储 : use environment variable

apache-kafka - Kafka 的 session.timeout.ms 和 max.poll.interval.ms 之间的差异 >= 0.10.1