java - Kafka java消费者SSL握手错误: java. security.cert.CertificateException:不存在主题替代名称

标签 java ssl apache-kafka kafka-consumer-api apache-kafka-security

我正在运行 kafka 2.13-2.4.1 并在用 java 编写的 kafka 客户端(消费者)和 kafka 集群(3 个节点,每个节点有一个代理)之间配置 SSL 连接。 我通过Confluent's Documentation使用官方文档它具有单向身份验证(客户端没有证书),并且它不起作用,所以我不得不使用两种身份验证,然后消费者和生产者控制台都通过 SSL 进行了正常通信,但是当我使用我的 java 消费者应用程序时:

package kafkaconsumerssl;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class KafkaConsumerSSLTest {

    public static void main(String[] args) throws KafkaException {
        Properties props = new Properties();
                props.put("security.protocol", "SSL");
                props.put("ssl.endpoint.identification.algorithm=", "");
                props.put("ssl.truststore.location","/var/private/ssl/kafka.client.truststore.jks");
                props.put("ssl.truststore.password","*******");
                props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks");
                props.put("ssl.keystore.password", "********"); 
                props.put("ssl.key.password", "*******");
                props.put("acks", "all");
                props.put("retries", "0");

                props.setProperty("zk.connnect", "172.31.32.219:2181,172.31.41.226:2181,172.31.33.133:2181");
                props.setProperty("group.id", "ConsumersTest");
                props.setProperty("auto.offset.reset","earliest");
                props.setProperty("enable.auto.commit", "true");
                    props.setProperty("auto.commit.interval.ms", "1000");
                props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.setProperty("bootstrap.servers","172.31.41.226:9093,172.31.33.133:9093,172.31.32.219:9093");

                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Arrays.asList("testKafka"));
                ConsumerRecords<String,String> messages = consumer.poll(Duration.ofMillis(4000));
                System.out.printf("reading_records...\n");
                for (ConsumerRecord<String, String> record : messages) {
                            System.out.printf("offset= %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                consumer.close();
    }}

我的 kafka 代理配置 server.properties 文件是:

ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=*******
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=******
ssl.key.password=******
listeners=PLAINTEXT://172.31.41.226:9092,SSL://172.31.41.226:9093
advertised.listeners=SSL://172.31.41.226:9093,PLAINTEXT://172.31.41.226:9092
ssl.client.auth=required
#security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=

当我运行 java -cp .:/opt/kafka/libs/* -Djavax.net.debug=ssl:handshake KafkaConsumerSSLTest2.java 我收到以下错误:

javax.net.ssl|ERROR|01|main|2020-03-27 22:55:31.527 UTC|TransportContext.java:312|Fatal (CERTIFICATE_UNKNOWN): No subject alternative names present (
"throwable" : {
  java.security.cert.CertificateException: No subject alternative names present
        at java.base/sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:137)
        at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:96)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:429)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
        at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:623)
        at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
        at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
        at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
        at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1048)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:770)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:995)
        at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:402)
        at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:484)
        at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:340)
        at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:265)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:170)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
        at kafkaconsumerssl.KafkaConsumerSSLTest.main(KafkaConsumerSSLTest2.java:40)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.execute(Main.java:404)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.run(Main.java:179)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.main(Main.java:119)}

)
javax.net.ssl|WARNING|01|main|2020-03-27 22:55:31.528 UTC|SSLEngineOutputRecord.java:168|outbound has closed, ignore outbound application data
[2020-03-27 22:55:31,529] ERROR [Consumer clientId=consumer-ConsumersTest-1, groupId=ConsumersTest] Connection to node 2147483645 (/172.31.32.219:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient:745)
Exception in thread "main" java.lang.IllegalArgumentException: 0 > -7
        at java.base/java.util.Arrays.copyOfRange(Arrays.java:4021)
        at java.base/java.util.Arrays.copyOfRange(Arrays.java:3981)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.execute(Main.java:416)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.run(Main.java:179)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.main(Main.java:119)

最佳答案

您必须将 ssl.endpoint.identification.algorithm 设置为空字符串(省略 =):

props.put("ssl.endpoint.identification.algorithm", "");

请注意,如果以上解决了问题,则意味着证书与您用于运行消费者的机器的主机名不匹配。

关于java - Kafka java消费者SSL握手错误: java. security.cert.CertificateException:不存在主题替代名称,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60895431/

相关文章:

java - OkHttp - 获取失败的响应正文

java - mysql java : insert a row with some parametres don't work

java - Jenkins 无法连接到 ldaps ://server

用于从 https 排除 POST 端点的 Laravel .htaccess 配置

java - Spark Streaming 除了字数统计之外还能做其他事情吗?

java - Camel KafkaIdempotory是什么及其用途?

java - 关于CodeChef上的 "Uncle Johnny"问题_

java - 分别为多个war文件在tomcat中配置java代理

web-services - 验证服务器 - 什么是好的方法?

java - Apache Kafka 从代码创建主题