ssl - 在 Kafka 上启用 SSL

标签 ssl apache-kafka

我正在尝试使用代理所需的 SSL 连接到 kafka 集群,以便客户端连接。大多数客户可以通过 SSL 与经纪人通信,所以我知道经纪人设置正确。我们打算使用双向 SSL 身份验证并遵循以下说明:https://docs.confluent.io/current/tutorials/security_tutorial.html#security-tutorial .

但是我有一个 Java 应用程序,我想连接到代理。我认为 SSL 握手未完成,因此对代理的请求超时。同一个 Java 应用程序可以毫无问题地连接到未启用 SSL 的 Kafka 代理。

更新:

我在尝试启用 ssl 时遇到了这个问题。调试时,身份验证异常转为空。我还可以看到我的信任库和 keystore 已正确加载。那么如何进一步解决此元数据更新请求超时问题呢?

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

来自

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {

当我使用 bitnami docker 图像运行 kafka console producer 并将相同的 trustStore/keyStore 作为 env 变量传递时,它工作正常。

这个有效:

docker run -it -v /Users/kafka/kafka_2.11-1.0.0/bin/kafka.client.keystore.jks:/tmp/keystore.jks -v /Users/kafka/kafka_2.11-1.0.0/bin/kafka.client.truststore.jks:/tmp/truststore.jks -v /Users/kafka/kafka_2.11-1.0.0/bin/client_ssl.properties:/tmp/client.properties bitnami/kafka:1.0.0-r3 kafka-console-producer.sh --broker-list some-elb.elb.us-west-2.amazonaws.com:9094 --topic test --producer.config /tmp/client.properties

这是我的 Java 客户端应用程序的调试日志。感谢任何有关如何解决此问题的见解。

                2018-03-13 20:13:38.661  INFO 20653 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2018-03-13 20:13:38.669  INFO 20653 --- [           main] c.i.aggregate.precompute.Application     : Started Application in 14.066 seconds (JVM running for 15.12)
2018-03-13 20:13:42.225  INFO 20653 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = all
    batch.size = 16384
    bootstrap.servers = [some-elb.elb.us-west-2.amazonaws.com:9094]
    buffer.memory = 33554432
    client.id = 
    compression.type = lz4
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 2000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = [hidden]
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = /Users/kafka/Cluster-Certs/kafka.client.keystore.jks
    ssl.keystore.password = [hidden]
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = /Users/kafka/Cluster-Certs/kafka.client.truststore.jks
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = <some class>

2018-03-13 20:13:42.287 TRACE 20653 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Starting the Kafka producer
2018-03-13 20:13:42.841 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bufferpool-wait-time
2018-03-13 20:13:43.062 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name buffer-exhausted-records
2018-03-13 20:13:43.217 DEBUG 20653 --- [           main] org.apache.kafka.clients.Metadata        : Updated cluster metadata version 1 to Cluster(id = null, nodes = [some-elb.elb.us-west-2.amazonaws.com:9094 (id: -1 rack: null)], partitions = [])
2018-03-13 20:13:45.670 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name produce-throttle-time
2018-03-13 20:13:45.909 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name connections-closed:
2018-03-13 20:13:45.923 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name connections-created:
2018-03-13 20:13:45.935 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name successful-authentication:
2018-03-13 20:13:45.946 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name failed-authentication:
2018-03-13 20:13:45.958 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bytes-sent-received:
2018-03-13 20:13:45.968 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bytes-sent:
2018-03-13 20:13:45.990 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bytes-received:
2018-03-13 20:13:46.005 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name select-time:
2018-03-13 20:13:46.025 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name io-time:
2018-03-13 20:13:46.130 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name batch-size
2018-03-13 20:13:46.139 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name compression-rate
2018-03-13 20:13:46.147 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name queue-time
2018-03-13 20:13:46.156 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name request-time
2018-03-13 20:13:46.165 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name records-per-request
2018-03-13 20:13:46.179 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name record-retries
2018-03-13 20:13:46.189 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name errors
2018-03-13 20:13:46.199 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name record-size
2018-03-13 20:13:46.250 DEBUG 20653 --- [           main] org.apache.kafka.common.metrics.Metrics  : Added sensor with name batch-split-rate
2018-03-13 20:13:46.275 DEBUG 20653 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Starting Kafka producer I/O thread.
2018-03-13 20:13:46.329  INFO 20653 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
2018-03-13 20:13:46.333  INFO 20653 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d
2018-03-13 20:13:46.369 DEBUG 20653 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Kafka producer started
2018-03-13 20:13:52.982 TRACE 20653 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Requesting metadata update for topic ssl-txn.
2018-03-13 20:13:52.987 TRACE 20653 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Found least loaded node some-elb.elb.us-west-2.amazonaws.com:9094 (id: -1 rack: null)
2018-03-13 20:13:52.987 DEBUG 20653 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Initialize connection to node some-elb.elb.us-west-2.amazonaws.com:9094 (id: -1 rack: null) for sending metadata request
2018-03-13 20:13:52.987 DEBUG 20653 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Initiating connection to node some-elb.elb.us-west-2.amazonaws.com:9094 (id: -1 rack: null)
2018-03-13 20:13:53.217 DEBUG 20653 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--1.bytes-sent
2018-03-13 20:13:53.219 DEBUG 20653 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--1.bytes-received
2018-03-13 20:13:53.219 DEBUG 20653 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--1.latency
2018-03-13 20:13:53.222 DEBUG 20653 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : [Producer clientId=producer-1] Created socket with SO_RCVBUF = 33488, SO_SNDBUF = 131376, SO_TIMEOUT = 0 to node -1
2018-03-13 20:13:53.224 TRACE 20653 --- [ad | producer-1] o.a.k.common.network.SslTransportLayer   : SSLHandshake NEED_WRAP channelId -1, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 0
2018-03-13 20:13:53.224 TRACE 20653 --- [ad | producer-1] o.a.k.common.network.SslTransportLayer   : SSLHandshake handshakeWrap -1
2018-03-13 20:13:53.225 TRACE 20653 --- [ad | producer-1] o.a.k.common.network.SslTransportLayer   : SSLHandshake NEED_WRAP channelId -1, handshakeResult Status = OK HandshakeStatus = NEED_UNWRAP
bytesConsumed = 0 bytesProduced = 326, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 0
2018-03-13 20:13:53.226 TRACE 20653 --- [ad | producer-1] o.a.k.common.network.SslTransportLayer   : SSLHandshake NEED_UNWRAP channelId -1, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 326
2018-03-13 20:13:53.226 TRACE 20653 --- [ad | producer-1] o.a.k.common.network.SslTransportLayer   : SSLHandshake handshakeUnwrap -1
2018-03-13 20:13:53.227 TRACE 20653 --- [ad | producer-1] o.a.k.common.network.SslTransportLayer   : SSLHandshake handshakeUnwrap: handshakeStatus NEED_UNWRAP status BUFFER_UNDERFLOW
2018-03-13 20:13:53.227 TRACE 20653 --- [ad | producer-1] o.a.k.common.network.SslTransportLayer   : SSLHandshake NEED_UNWRAP channelId -1, handshakeResult Status = BUFFER_UNDERFLOW HandshakeStatus = NEED_UNWRAP
bytesConsumed = 0 bytesProduced = 0, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 326
2018-03-13 20:13:53.485 DEBUG 20653 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
2018-03-13 20:13:53.485 TRACE 20653 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Found least loaded node some-elb.elb.us-west-2.amazonaws.com:9094 (id: -1 rack: null)
2018-03-13 20:13:54.992 DEBUG 20653 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Exception occurred during message send:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 2000 ms.

2018-03-13 20:13:54.992  INFO 20653 --- [           main] c.i.aggregate.precompute.kafka.Producer  : sent message in callback 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 2000 ms.
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1124)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
    at com.intuit.aggregate.precompute.kafka.Producer.send(Producer.java:76)
    at com.intuit.aggregate.precompute.Application.main(Application.java:58)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 2000 ms.
Disconnected from the target VM, address: '127.0.0.1:53161', transport: 'socket'

最佳答案

此问题是由于代理证书不正确造成的。对于密码,java 与 scala/python 有不同的默认值,这就是其他语言客户端工作的原因。但是 Go 也有类似的问题,然后他们在代理上启用了 SSL 日志记录并发现了这个问题。

关于ssl - 在 Kafka 上启用 SSL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49269261/

相关文章:

ssl - golang tls.Config CiperSuites 不限制客户端认证

ssl - 无法删除 Google 管理的 SSL 证书

python - 即使使用较旧的 spark 版本,也没有名为 'pyspark.streaming.kafka' 的模块

java - 在代理后面使用 Maven 和 SSL 的问题

apache - 我在哪里可以找到 CentOS 服务器中的 ssl.conf 文件?

asp.net - 有什么方法可以过滤 IIS 提供的页面中的某些内容吗?

apache-kafka - 使用 Burrow 进行 Kafka 监控

apache-kafka - Apache Flink State Store 与 Kafka Streams

apache-spark - 如果在提供给 kafka 的数据中遇到意外格式,当您重新启动 spark 作业时会发生什么

apache-kafka - 扩展 Kafka Connect 与扩展 Kafka Consumer 相同吗?