apache-kafka - 将 Confluence Schema Registry 与 MSK 结合使用

标签 apache-kafka confluent-schema-registry aws-msk

是否可以将 Confluence Schema Registry 与 AWS MSK 集成? 如果您以前做过此操作,能否提供一些您遵循的实现此操作的指示/博客?

最佳答案

这是可能的。我的设置使用 ec2 和 docker。

  1. 如果您使用基于 IAM 的身份验证,请下载 IAM 身份验证 jar
mkdir -p /usr/share/java/aws
wget -P /usr/share/java/aws https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
chmod -R 444 /usr/share/java/aws
  • 使用 Confluence 官方 Docker 镜像进行架构注册表
  • ...
    
      schema-registry:
        image: confluentinc/cp-schema-registry:5.4.6-1-ubi8
        hostname: schema-registry
        container_name: schema-registry
        ports:
          - "8081:8081"
        volumes:
          - /usr/share/java/aws/aws-msk-iam-auth-1.1.1-all.jar:/usr/share/java/cp-base-new/aws-msk-iam-auth-1.1.1-all.jar
          - /usr/share/java/aws/aws-msk-iam-auth-1.1.1-all.jar:/usr/share/java/rest-utils/aws-msk-iam-auth-1.1.1-all.jar
        environment: # https://docs.confluent.io/platform/current/schema-registry/installation/config.html#schemaregistry-config
          SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
          SCHEMA_REGISTRY_HOST_NAME: "${HOSTNAME}" # 
          SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "${BOOTSTRAP_BROKERS_SASL_IAM}"
          SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: "SASL_SSL"
          SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: "AWS_MSK_IAM"
          SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: "software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=true;"
          SCHEMA_REGISTRY_KAFKASTORE_SASL_CLIENT_CALLBACK_HANDLER_CLASS: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
    
    ...
    
    • HOSTNAME 是您的 ec2 计算机 DNS 名称或 IP,例如 ip-10-0-0-84.ec2.internal
    • BOOTSTRAP_BROKERS_SASL_IAM 是逗号分隔的 host1:port,host2:port 网址。获取端口信息see this

    如果您使用 PLAINTEXT 或 SSL 身份验证,则最后 4 个环境变量会更改。而且您不必下载 iam auth jar

  • 使用这些属性配置源连接器或接收器连接器
  • ...
    key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schemas.enable=true
    value.converter.schema.registry.url=http://ip-10-0-0-84.ec2.internal:8081
    value.converter.enhanced.avro.schema.support=true
    

    就是这样。
    请在 MSK 集群的 EC2 实例安全组中打开 8081 端口

    资源:


    我尝试过的替代选项是 AWS Glue Schema registry 但我们必须使用 KSQL,而 KSQL 没有第三方架构注册表集成或自定义 SerDe Github issue

    关于apache-kafka - 将 Confluence Schema Registry 与 MSK 结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70500586/

    相关文章:

    linux - 将 kafka 代理 ID(zookeeper-shell.sh 结果)保存到 bash 脚本中的变量

    java - 如何在 Kafka Avro 生产者中发送对象的 ArrayList,而不为每个单独的记录调用 send 方法?

    java - 使用 Avro Schema 注册表的 Kafka 消费者单元测试失败

    amazon-web-services - 在 CloudFormation 中获取 AWS::MSK::Configuration 的最新版本

    java - 了解Kafka流groupBy和window

    scala - 带URI的Flink,Kafka和Zookeeper

    java - 如何根据输入 ID 在架构注册表中注册架构

    apache-kafka - Avro 应该同时用于 Kafka 中的键和值吗?

    java - AWS Kafka (MSK) - 如何生成 keystore 和信任库并在我的 Spring Cloud Stream 应用程序中使用它们?

    amazon-web-services - AWS MSK - 在 ACL 打开的情况下创建 Kafka 主题时超时