java - 如何使用 Beam KafkaIO 配置客户端身份验证

标签 java apache-kafka apache-beam

我一直在浏览 Beam KafkaIO 教程,并一直在尝试查找有关 kafka 客户端身份验证的文档,但到目前为止只找到了非常基本的示例。我需要为 Kafkaio 客户端提供以下配置才能成功进行身份验证:

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

我如何指定此配置?

到目前为止,我在示例中找到的都是这样配置的:

p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("kafka1:9022")
.withTopic("test-topic")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)

最佳答案

您可以使用 updateConsumerProperties(properties) 方法设置 ssl 配置。
为此,您需要设置以下消费者属性。

Properties props = new Properties();
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.truststore.jks");    
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourcePath.get("keystore.jks"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  "test1234");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,  "test1234"); 
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,  "test1234");

在方法中传递上述属性,如下所示:

p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("kafka1:9022")
.withTopic("test-topic")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(props)

您可以在此处找到有关如何在 KafkaIO 中设置自定义属性的更多文档:https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/kafka/KafkaIO.html

关于java - 如何使用 Beam KafkaIO 配置客户端身份验证,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54689202/

相关文章:

python - 如何修复 "AttributeError: ' str' 对象在从 PubSub 读取并写入 BigQuery 的数据流管道中没有属性 'items'

java - Richfaces modalPanel 滚动

Java.util.logger 每天新文件

java - 在 KAFKA 中生成和使用 JSON

java - 运行 Storm 拓扑时为 "java.lang.OutOfMemoryError: unable to create new native thread"

java - Apache Beam - 读取 JSON 和 Stream

java - 线程 "main"中的异常 org.springframework.jdbc.CannotGetJdbcConnectionException

java - Jackson Json 访问 JsonNode 属性名

apache-kafka - 高级消费者上的 commitOffsets 是否会阻塞?

java - 数据流 : string to pubsub message