apache-kafka - Apache Kafka 默认编码器不工作

标签 apache-kafka

我正在使用 Kafka 0.8 beta,我只是想发送不同的对象,使用我自己的编码器序列化它们,然后将它们发送到现有的代理配置。现在我试图让 DefaultEncoder 正常工作。

我有代理和所有设置并为 StringEncoder 工作,但我无法获得任何其他数据类型,包括由代理发送和接收的纯字节 []。

我的生产者代码是:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Date;
import java.util.Properties;
import java.util.Random;


public class ProducerTest {
    public static void main(String[] args) {
        long events = 5;
        Random rnd = new Random();
        rnd.setSeed(new Date().getTime());
        Properties props = new Properties();
        props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094");
        props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
        props.setProperty("partitioner.class", "example.producer.SimplePartitioner");
        props.setProperty("request.required.acks", "1");
        props.setProperty("producer.type", "async");
        props.setProperty("batch.num.messages", "4");

        ProducerConfig config = new ProducerConfig(props);
        Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
        for (long nEvents = 0; nEvents < events; nEvents++) {
            byte[] a = "Hello".getBytes();
            byte[] b = "There".getBytes();

            KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b);
            producer.send(data);
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producer.close();
    }
}

我使用了与示例中相同的 SimplePartitioner here ,并用字符串替换所有字节数组并将序列化器更改为 kafka.serializer.StringEncoder 完美工作。

作为引用,SimplePartitioner:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner<String> {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(String key, int a_numPartitions) {
        int partition = 0;
        int offset = key.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }

}

我究竟做错了什么?

最佳答案

答案是分区类SimplePartitioner仅适用于字符串。当我尝试异步运行 Producer 时,它会创建一个单独的线程,在发送到代理之前处理编码和分区。该线程在意识到 SimplePartitioner 仅适用于字符串时遇到了障碍,但由于它是一个单独的线程,因此不会抛出异常,因此该线程只是退出而没有任何不当行为的迹象。

如果我们将 SimplePartitioner 更改为接受 byte[],例如:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner<byte[]> {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(byte[] key, int a_numPartitions) {
        int partition = 0;
        return partition;
    }

}

这现在工作得很好。

关于apache-kafka - Apache Kafka 默认编码器不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19017422/

相关文章:

docker - Confluent Schema Registry Docker 镜像不在容器外暴露端口 8081

apache-kafka - 如何有效地获取 Kafka 日志大小?

java - Kafka Broker 问题 - UnknownServerException

apache-kafka - debezium 生成事件的 Kafka Connect 日期处理

java - 是否可以使用 java 更改现有 kafka 主题的复制因子?

logging - 如何为 Kafka 生产者配置日志记录?

python - Kafka-python 检索主题列表

apache-kafka - 适用于 RDS Aurora 的 Debezium 连接器

apache-kafka - Kafka 消费者组 - 分区数 - 复制数

django - Kafka Python 生产者与 django Web 应用程序集成