java - 无法向消息中心发布消息

标签 java apache-kafka message message-hub

// Asynchronous response from Message Hub / Kafka.
kafkaProducer.send(record,
new Callback() {
   public void onCompletion(RecordMetadata m, Exception e) {
       if(e != null) {
       e.printStackTrace();
       } else { 
    log.debug(" **** Message sent, offset: " + m.offset() + 
    " @ partition " + m.partition());
    log.debug(" <<<< " +
    " document_id " + key +
    " @ " + account.getActivityId());
    }
   }
});

当尝试使用上述代码将消息发布到消息中心时,我们总是收到以下错误。

2016-06-21 18:38:22-[INFO] com.ibm.cloudant.streaming.messageHub.Client.send(476): >>>> sending document_id julia30 @ my_database 2016-06-21 18:38:22-[DEBUG] org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(623): Initialize connection to node -1 for sending metadata request 2016-06-21 18:38:22-[DEBUG] org.apache.kafka.clients.NetworkClient.initiateConnect(487): Initiating connection to node -1 at kafka01-prod01.messagehub.services.us-south.bluemix.net:9093. 2016-06-21 18:38:22-[DEBUG] org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(105): Creating SaslClient: client=multiuser-adapter@multiuser.messagehub.ibm.com;service=kafka;serviceHostname=kafka01-prod01.messagehub.services.us-south.bluemix.net;mechs=[GSSAPI] 2016-06-21 18:38:22-[DEBUG] com.ibm.cloudant.streaming.messageHub.AccountManager.(53): process id: 15825 2016-06-21 18:38:22-[INFO] org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(92): Failed to create channel due to org.apache.kafka.common.KafkaException: Failed to configure SaslClientAuthenticator at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:96) at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:89) at org.apache.kafka.common.network.Selector.connect(Selector.java:162) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:489) at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:47) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:624) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:543) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:254) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: Failed to create SaslClient at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:112) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:94) ... 10 more Caused by: javax.security.sasl.SaslException: PLAIN: authorization ID and password must be specified at com.sun.security.sasl.PlainClient.(PlainClient.java:58) at com.sun.security.sasl.ClientFactoryImpl.createSaslClient(ClientFactoryImpl.java:97) at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384) at com.ibm.messagehub.login.MessageHubSaslClientFactory.createSaslClient(MessageHubSaslClientFactory.java:77) at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:107) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:102) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:102) ... 11 more 2016-06-21 18:38:22-[ERROR] org.apache.kafka.clients.producer.internals.Sender.run(130): Uncaught error in kafka producer I/O thread: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to configure SaslClientAuthenticator at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:93) at org.apache.kafka.common.network.Selector.connect(Selector.java:162) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:489) at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:47) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:624) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:543) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:254) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: Failed to configure SaslClientAuthenticator at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:96) at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:89) ... 9 more Caused by: org.apache.kafka.common.KafkaException: Failed to create SaslClient at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:112) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:94) ... 10 more Caused by: javax.security.sasl.SaslException: PLAIN: authorization ID and password must be specified at com.sun.security.sasl.PlainClient.(PlainClient.java:58) at com.sun.security.sasl.ClientFactoryImpl.createSaslClient(ClientFactoryImpl.java:97) at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384) at com.ibm.messagehub.login.MessageHubSaslClientFactory.createSaslClient(MessageHubSaslClientFactory.java:77) at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:107) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(SaslClientAuthenticator.java:102) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:102) ... 11 more 2016-06-21 18:38:22-[ERROR] org.apache.kafka.clients.producer.internals.Sender.run(130): Uncaught error in kafka producer I/O thread: java.lang.NullPointerException at org.apache.kafka.common.network.Selector.poll(Selector.java:268) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:745)

生产者的设置如下:

```

 compression.type = none
 metric.reporters = []
 metadata.max.age.ms = 300000
 metadata.fetch.timeout.ms = 60000
 reconnect.backoff.ms = 50
 sasl.kerberos.ticket.renew.window.factor = 0.8
 bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9093]
 retry.backoff.ms = 100
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 buffer.memory = 33554432
 timeout.ms = 30000
 key.serializer = class org.apache.kafka.common.serialization.StringSerializer
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 ssl.keystore.type = JKS
 ssl.trustmanager.algorithm = PKIX
 block.on.buffer.full = false
 ssl.key.password = null
 max.block.ms = 60000
 sasl.kerberos.min.time.before.relogin = 60000
 connections.max.idle.ms = 540000
 ssl.truststore.password = [hidden]
 max.in.flight.requests.per.connection = 5
 metrics.num.samples = 2
 client.id = kafka01-prod01.messagehub.services.us-south.bluemix.net%3A9093_8qp87X32V6PK5epv.1
 ssl.endpoint.identification.algorithm = HTTPS
 ssl.protocol = TLSv1.2
 request.timeout.ms = 30000
 ssl.provider = null
 ssl.enabled.protocols = [TLSv1.2]
 acks = -1
 batch.size = 16384
 ssl.keystore.location = null
 receive.buffer.bytes = 32768
 ssl.cipher.suites = null
 ssl.truststore.type = JKS
 security.protocol = SASL_SSL
 retries = 1
 max.request.size = 1048576
 value.serializer = class  org.apache.kafka.common.serialization.StringSerializer
 ssl.truststore.location = /Users/jiangph/tools/liberty/usr/shared/resources/keystore.jks
 ssl.keystore.password = null
 ssl.keymanager.algorithm = SunX509
 metrics.sample.window.ms = 30000
 send.buffer.bytes = 131072
 linger.ms = 0

```

使用上述设置,使用消息中心 REST API 创建主题没有问题。尝试发布消息时出现问题。

任何想法都将受到高度赞赏。

最佳答案

MessageHub REST 客户端 API 的身份验证方式与 Java Kafka 客户端不同。

我发现您的日志中存在身份验证错误: javax.security.sasl.SaslException:PLAIN:必须在 com.sun.security.sasl.PlainClient 处指定授权 ID 和密码。

要配置 Java 客户端以进行 MessageHub SASL 身份验证, 请参阅以下 Java 示例:

https://github.com/ibm-messaging/message-hub-samples/tree/master/java/message-hub-kafka-ssl

请注意,您的生产者属性应包括以下内容:

https://github.com/ibm-messaging/message-hub-samples/blob/master/java/message-hub-kafka-ssl/resources/producer.properties

你的 jaas.conf 文件应该是这样的

卡夫卡客户端{ 需要 com.ibm.messagehub.login.MessageHubLoginModule 服务名称=“卡夫卡” 用户名=“您的用户名” 密码=“您的密码”; }; 并且您的类路径中必须有 MessageHub 登录 jar。

HTH, 江户

关于java - 无法向消息中心发布消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37943120/

相关文章:

spring - Apache Kafka 与 tomcat 和 spring 的集成

python - 在kafka-python中的客户端消费消息

java - 单击按钮时发送自动短信

jquery - 如何在 JQuery 数据表中显示自定义处理消息

java - 当我使用 Spring 和 MQ Queue 发送 OBject 消息时,为什么会收到 JMSBytesMessage

java - spring aop代理对象

apache-kafka - 如果消费者持有消息的时间超过自动提交间隔时间,kafka会丢失消息吗?

overriding - ActiveMQ 覆盖预定消息

java - 日志记录会减慢生产 Android 应用程序的速度吗?

java - 如果无法通过 Java 中的 DNS 解析,如何获取本地主机名?