java - Kafka 生产者配置 metadata.broker.list 和 url

标签 java apache-kafka

我已经设置了一个有 3 个代理的 kafka 服务器。我想从我的计算机向这三个代理发送消息,但我已经为每个代理配置了一个 url,例如 abc.com/kafka1/abc.com/kafka2/abc.com/kafka3/ ngix.

如何在 metadata.broker.list 属性中使用这些 url?我的代码如下。

package com.xxx.x.kafka.producer;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Date;
import java.util.Properties;
import java.util.Random;


    class TestProducer {
        public static void main(String[] args) {
            long events = Long.parseLong(args[0]);
            Random rnd = new Random();

            Properties props = new Properties();
            props.put("metadata.broker.list", "abc.com/kafka1/:80,abc.com/kafka2/:80,abc.com/kafka3/:80");

            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("partitioner.class", "com.knx.adx.kafka.producer.SimplePartitioner");
            props.put("request.required.acks", "1");

            ProducerConfig config = new ProducerConfig(props);

            Producer<String, String> producer = new Producer<String, String>(config);

            for (long nEnvents = 0; nEnvents < events; nEnvents++) {
                long runtime = new Date().getTime();
                String ip = "192.168.2." + rnd.nextInt(255);
                String msg = runtime + ",www.example.com" + ip;
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
                producer.send(data);
            }
            producer.close();
        }
    }

这是我在运行代码时遇到的错误。

Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at com.knx.adx.kafka.producer.TestProducer.main(TestProducer.java:35)

line error producer.send(data);

最佳答案

配置变量 metadata.broker.list 需要 host1:port1,host2:port2 而不是 URL。尝试为每个代理配置不同的子域名,例如 kafka1.abc.com:80,kafka2.abc.com:80,kafka3.abc.com:80 并将这些子域指向适当的主机。请参阅 Kafka Configuration 的生产者配置部分

This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.

关于java - Kafka 生产者配置 metadata.broker.list 和 url,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31670175/

相关文章:

java - 在单元测试中等待 Platform.RunLater

Java - 使用 SQLite 数据库填充 JCombobox

java - 有没有理由不使用 Java 8 的 parallelSort?

java - 给定一个包,我如何确定它的 artifactId 应该是什么?

python - 从 kafka 导入 KafkaClient ImportError : No module named kafka

java - 有没有一种简单的方法可以将复杂类的对象写入 java 文件?

apache-kafka - Kafka 如何向多个消费者组广播

java - 如何实例化 Kafka Connect Schema 数组

java - 在 Kafka 消费者配置中将 max.poll.interval.ms 设置为大于 request.timeout.ms 的负面影响是什么

mongodb - 带有 kafka 和 mongoDB 连接器的发件箱模式