string - Kafka 消息键 - 同时使用 byte[] 和 String

标签 string casting byte apache-kafka

我在使用 Kafka 时遇到了一个非常令人困惑的问题 - 特别是试图获取消息的 key 。

关键好像认为它既是一个String又是一个byte[]

以下代码产生以下异常:

    Map<String, Integer> topicCount = new HashMap<>();
    topicCount.put(myConsumer.getTopic(), 1);

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = myConsumer.getConsumer().createMessageStreams(topicCount);
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(myConsumer.getTopic());
    System.out.println("Listening to topic " + myConsumer.getTopic());
    for (final KafkaStream stream : streams) {
        ConsumerIterator<String, byte[]> it = stream.iterator();
        while (it.hasNext()) {

            System.out.println("Message received from topic");

            MessageAndMetadata<String, byte[]> o = it.next();

            Object messageKey = o.key();
            System.out.println("messageKey is type: " + messageKey.getClass().getName());
            System.out.println("messageKey is type: " + messageKey.getClass().getCanonicalName());
            System.out.println("o keyDecoder: " + o.keyDecoder());

            System.out.println("Key from message: " + o.key());  //This throws exception - [B cannot be cast to java.lang.String
            //System.out.println("Key as String: " + new String(o.key(), StandardCharsets.UTF_8));    //uncomment this compile Exception - no suitable constructor found for String(java.lang.String,java.nio.charset.Charset)

            byte[] bytesIn = o.message();       //getting the bytes is fine

            System.out.println("MessageAndMetadata: " + o);

            ///other code cut
        }
    }

异常:

Listening to topic MyKafkaTopic
Message received from topic
messageKey is type: [B
messageKey is type: byte[]
o decoder: kafka.serializer.DefaultDecoder@2e0d0acd
[WARNING] 
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
    at com.foo.bar.KafkaCFS.process(KafkaCFS.java:153)
    at com.foo.bar.KafkaCFS.run(KafkaCFS.java:63)
    at com.foo.bar.App.main(App.java:90)
    ... 6 more      

Maven:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>0.9.0.1</version>
</dependency>

如果我取消注释 System.out 行,那么我什至无法编译:

[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /C:/Dev/main/java/com/foo/bar/KafkaCFS.java:[152,56] no suitable constructor found for String(java.lang.String,java.nio.charset.Charset)
    constructor java.lang.String.String(byte[],int) is not applicable
    (argument mismatch; java.lang.String cannot be converted to byte[])

为什么编译器认为 Key 是一个字符串(这正是我所期望的),但运行时它是一个字节数组?

如何获取字符串形式的 key ?

谢谢

KA。

最佳答案

这不匹配!您将流声明为 KafkaStream<byte[], byte[]>然后你期望 ConsumerIterator<String, byte[]> it = stream.iterator();应该是ConsumerIterator<byte[], byte[]> it = stream.iterator();来匹配泛型。然后就可以得到o.key()并通过 new String(o.key()); 创建一个字符串

关于string - Kafka 消息键 - 同时使用 byte[] 和 String,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40375820/

相关文章:

c - 不同程序中的相同阵列

Java泛型类型方法返回错误的类型

c - C 中的安全类型转换

c - 查看 C 中变量的字节/位

java - C# 中的 Byte[]String 到 String

java - Java中字符串到时间戳转换差异错误

mysql - mysql 查询的正则表达式?

c++ - 从字符串中标记/提取信息的最佳方法

c# - 将字符串(从文本框)转换/转换为 IntPtr C#

go - 如何从标准输入中按字节读取?