java - 在 kafka 流中使用 kafka connect json api 消费 JSON 值 : JAVA

标签 java json apache-kafka-streams

我正在尝试在 kafka 流中使用 kafka connect api 消费 json 消息。 我尝试在 google 中搜索,但找不到任何有关如何在 Streams api 中读取 json 消息的实质性信息。

因此,凭借有限的知识,我尝试了以下方法。

package com.kafka.api.serializers.json;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ConsumerUtilities {

    private static ObjectMapper om = new ObjectMapper();

    public static Properties getProperties() {

        Properties configs = new Properties();
        configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "Kafka test application");
        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                "org.apache.kafka.connect.json.JsonDeserializer");
        return configs;
    }

    public static KStreamBuilder getStreamingConsumer() {
        KStreamBuilder builder = new KStreamBuilder();
        return builder;
    }

    public static void printStreamData() {
        KStreamBuilder builder = getStreamingConsumer();
        KStream<String, JsonNode> kStream = builder.stream("test");
        kStream.foreach(new ForeachAction<String, JsonNode>() {
            @Override
            public void apply(String key, JsonNode value) {
                try {
                    System.out.println(key + " : " + om.treeToValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

        });

        KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
        kafkaStreams.start();
    }

}

package com.kafka.api.serializers.json;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class ProducerUtilities {

    private static ObjectMapper om = new ObjectMapper();


    public static org.apache.kafka.clients.producer.Producer<String, JsonNode> getProducer() {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
                "kafka json producer");
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.connect.json.JsonSerializer");

        org.apache.kafka.clients.producer.Producer<String, JsonNode> producer = new KafkaProducer<String, JsonNode>(
                configProperties);
        return producer;
    }

    public ProducerRecord<String,JsonNode> createRecord(Person person){
        JsonNode jsonNode = om.valueToTree(person);
        ProducerRecord<String,JsonNode> record = new ProducerRecord<String,JsonNode>("test",jsonNode);
        return record;
    }

}

当我执行代码时,我收到如下异常

[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group Kafka test application failed on partition assignment
org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764)
    ... 19 more
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Shutting down
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Stream thread shutdown complete
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] All stream threads have died. The Kafka Streams instance will be in an error state and should be closed.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] State transition from REBALANCING to ERROR.
Exception in thread "Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:543)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
    ... 3 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764)
    ... 19 more

我正在寻找一些指导来解决该问题。

根据 Matthias 建议创建自定义序列化器和反序列化器

package com.kafka.api.utilities;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import com.kafka.api.models.Person;
import com.kafka.api.serdes.JsonDeserializer;
import com.kafka.api.serdes.JsonSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serde;

public class ConsumerUtilities {

    //private static ObjectMapper om = new ObjectMapper();

    public static Properties getProperties() {

        Properties configs = new Properties();
        configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "Kafka test application");
        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//      configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
//              "org.apache.kafka.common.serialization.ByteArraySerializer");
//      configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
//              "org.apache.kafka.connect.json.JsonDeserializer");
        return configs;
    }

    public static KStreamBuilder getStreamingConsumer() {
        KStreamBuilder builder = new KStreamBuilder();
        return builder;
    }

    public static void printStreamData() {
        JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(Person.class);
        Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer, personJsonDeserializer);

        KStreamBuilder builder = getStreamingConsumer();
        KStream<String, Person> kStream = builder.stream(Serdes.String(),personSerde , "test");
        kStream.foreach(new ForeachAction<String, Person>() {
            @Override
            public void apply(String key, Person value) {
                System.out.println(key + " : " + value.toString());
            }

        });

        KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
        kafkaStreams.start();
    }

}

package com.kafka.api.serdes;

import java.util.Map;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonDeserializer<T> implements Deserializer<T>{

    private ObjectMapper om = new ObjectMapper();
    private Class<T> type;

    /*
     * Default constructor needed by kafka
     */
    public JsonDeserializer(Class<T> type) {
        this.type = type;
    }
    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> map, boolean arg1) {
        if(type == null){
            type = (Class<T>) map.get("type");
        }

    }

    @Override
    public T deserialize(String undefined, byte[] bytes) {
        if(bytes == null || bytes.length == 0){
            return null;
        }

        try{
            return om.readValue(bytes, type);
        }catch(Exception e){
            throw new SerializationException(e);
        }
    }

    protected Class<T> getType(){
        return type;
    }

}

package com.kafka.api.serdes;

import java.util.Map;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonSerializer<T> implements Serializer<T> {

    private ObjectMapper om = new ObjectMapper();

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // TODO Auto-generated method stub

    }

    @Override
    public byte[] serialize(String topic, T data) {
        // TODO Auto-generated method stub
        try {
            return om.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException();
        }
    }

}

异常:执行流应用程序后,我收到以下异常。我很困惑。

[Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b] State transition from RUNNING to ERROR.
Exception in thread "Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=test, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
 at [Source: [B@5ee179dc; line: 1, column: 11]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
 at [Source: [B@5ee179dc; line: 1, column: 11]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
    at com.kafka.api.serdes.JsonDeserializer.deserialize(JsonDeserializer.java:43)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)

最佳答案

Streams API 需要读写数据,因此它使用了 Serde 的抽象。它同时是序列化器和反序列化器的包装器。这就是异常的基本含义。

Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde

因此,您需要包装 JsonSerializerJsonDeserialzier进入JsonSerde并使用这个 JsonSerdeStreamsConfig .

最简单的方法是使用 Serdes.serdeFrom(...)方法(注:Serdes——复数)。作为替代方案,您还可以实现 Serde直接接口(interface)(注Serde——单数)。您可以在 Serdes 中找到示例关于如何实现的类(class) Serde界面。

关于java - 在 kafka 流中使用 kafka connect json api 消费 JSON 值 : JAVA,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46565829/

相关文章:

apache-kafka - 从 Kafka 状态存储中删除记录是否也会从变更日志主题中删除该记录?

java - Spring-Cloud-Stream 仅在应用程序启动时实现的状态存储已完全填充并准备就绪后才处理 kafka 消息

java - 在 Java 中自动将自签名证书合并到我的信任库中的好策略是什么?

json - 如何在Azure RM中部署从资源组导出的模板?

java - 使用Kafka Stream中的状态存储(RocksDB)将一条记录转换为多条记录

javascript - 仅获取 json 数组中的第 4 个元素

json - Swift 3,从 JSON 获取一个值

包含内容的文件夹上的 Java renameTo

java - 检查字符串是否包含子字符串并从该点保存字符串

java - 方法返回类型前的 <T> 是什么意思?