java - 如何使用 SASL_SSL 连接 Apache Kafka 设置 Spring Cloud Kafka 项目?

标签 java spring-boot apache-kafka kafka-consumer-api spring-cloud-stream

我正在尝试使用 SASL_SSL 配置 Spring Cloud Kafka,但无法使其正常工作。我相信我的 application.yml 未正确配置,因此请提供建议和帮助。

这是我的 application.yml 配置:

spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers: localhost:9090
          consumerProperties:
            security.protocol: SASL_SSL
            sasl.mechanism: SCRAM-SHA-512
            sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="userkk" password="admin-secret";
          producerProperties:
            security.protocol: SASL_SSL
            sasl.mechanism: SCRAM-SHA-512
            sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="userkk" password="admin-secret";
      bindings:
        SINGAPOR_RECEIVER:
          binder: kafka
          destination: SINGAPOR_RECEIVER
          group: output-group-2
          content-type: text/plain
        SINGAPOR_RESPOND:
          binder: kafka
          destination: SINGAPOR_RESPOND
          group: output-group-1
          content-type: text/plain
        RESULT_RESPOND:
          binder: kafka
          destination: RESULT_RESPOND
          group: output-group-3
          content-type: text/plain

这是我在 Spring 控制台中得到的内容:

2020-02-04 16:58:20.687  INFO 35715 --- [           main] com.gdce.doca.ApplicationKt              : Starting ApplicationKt on yourpc with PID 35715 (/home/yourpc/data/work-project/java/kafka-doc-a/build/classes/kotlin/main started by yourpc in /home/yourpc/data/work-project/java/kafka-doc-a)
2020-02-04 16:58:20.689  INFO 35715 --- [           main] com.gdce.doca.ApplicationKt              : No active profile set, falling back to default profiles: default
2020-02-04 16:58:21.580  INFO 35715 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-02-04 16:58:21.586  INFO 35715 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2020-02-04 16:58:21.589  INFO 35715 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-02-04 16:58:21.639  INFO 35715 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-04 16:58:21.642  INFO 35715 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-04 16:58:21.658  INFO 35715 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-04 16:58:21.980  INFO 35715 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 9088 (http)
2020-02-04 16:58:21.993  INFO 35715 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2020-02-04 16:58:21.993  INFO 35715 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.30]
2020-02-04 16:58:22.082  INFO 35715 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2020-02-04 16:58:22.083  INFO 35715 --- [           main] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 1324 ms
2020-02-04 16:58:22.473  INFO 35715 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-02-04 16:58:22.722  INFO 35715 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2020-02-04 16:58:22.878  INFO 35715 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.SINGAPORE_RESPOND' has 1 subscriber(s).
2020-02-04 16:58:22.880  INFO 35715 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-02-04 16:58:22.881  INFO 35715 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2020-02-04 16:58:22.881  INFO 35715 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2020-02-04 16:58:23.108  INFO 35715 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: SINGAPORE_RECEIVER
2020-02-04 16:58:23.110  INFO 35715 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
        bootstrap.servers = [SASL_SSL://localhost:9090]
        client.dns.lookup = default
        client.id = 
        connections.max.idle.ms = 300000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 120000
        retries = 5
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS

2020-02-04 16:58:23.213  INFO 35715 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-02-04 16:58:23.214  INFO 35715 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-02-04 16:58:23.214  INFO 35715 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1580810303212

这是我从 Kafka 控制台收到的错误:

[2020-02-04 16:12:27,471] INFO [SocketServer brokerId=0] Failed authentication with test.local/127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector]

我尝试使用 kafka-console- Producer.sh 生成和使用消息,并且它可以与以下配置和命令配合使用: - kafka_client_jaas.conf

KafkaClient {
   org.apache.kafka.common.security.scram.ScramLoginModule required
   username="userkk"
   password="admin-secret";
};
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA/config/kafka_client_jaas.conf"
$KAFKA/bin/./kafka-console-producer.sh --broker-list localhost:9090 --topic test --producer.config $KAFKA/config/producer.properties
$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server localhost:9090 --topic test --from-beginning --consumer.config $KAFKA/config/consumer.properties

编辑

这是我使用的 kafka 的 server.properties 配置:

# PLAINTEXT
#listeners=SASL_PLAINTEXT://localhost:9090
#security.inter.broker.protocol=SASL_PLAINTEXT
#sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
#sasl.enabled.mechanisms=SCRAM-SHA-512


# SSL + SASL/SCRAM
listeners=SASL_SSL://localhost:9090
advertised.listeners=SASL_SSL://localhost:9090
advertised.host.name=localhost
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512


ssl.keystore.location=/home/yourpc/ssl-generator-tmp/ssl/server/server.p12
ssl.keystore.password=changeit
ssl.key.password=changeit
# ssl.truststore.location=/home/yourpc/server-truststore.jks
# ssl.truststore.password=123123


# If any of the SASL authentication mechanisms are enabled for a given listener, then SSL client authentication is disabled even if ssl.client.auth=required is configured, and the broker will authenticate clients only via SASL on that listener
ssl.client.auth=required


# topic control
auto.create.topics.enable=true
# delete.topic.enable=true


# zookeeper communication
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000


# NETWORK MANAGEMENT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=10485760


num.partitions=1
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=30000000
log.flush.interval.ms=1800000
log.retention.minutes=30
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000


# ENABLE ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin;User:user-a;User:Bob;User:Alice


ssl.endpoint.identification.algorithm=

最佳答案

尝试在不同的属性级别使用以下配置:

spring.kafka.bootstrap-servers=${BROKERS} spring.kafka.properties.security.protocol=SASL_SSL spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";

记住替换 $ 变量

关于java - 如何使用 SASL_SSL 连接 Apache Kafka 设置 Spring Cloud Kafka 项目?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60054913/

相关文章:

scala - 卡夫卡流: configuring `AdminClientConfig` or `ConsumerConfig` without overriding the values both

apache-kafka - Kafka 如何选择复制的跟随者节点?

java - 如何断言 Completable 是否已被订阅/完成 (RxJava2)

java - Window/linux 路径组件分离

java - 想要在使用 java swing 在网络浏览器中打开链接时隐藏/修改地址栏中的链接

maven - Spring Cloud Config 客户端未从配置服务器加载值

python-kafka,在 Windows 上,返回 'NodeNotReadyError'

java - 无法反序列化 Wiremock Json 文件配置中收到的 START_ARRAY token 异常中的 `java.lang.String` 实例

spring - 如何将@ConfigurationProperties Autowiring 到@Configuration中?

java - 如何在没有xml的情况下使用spring boot 2和ehcache 3?