apache-kafka-streams - 无法刷新状态存储

标签 apache-kafka-streams

我正在尝试在 Kafka Streams 中创建一个 leftJoin,它适用于大约 10 条记录,然后它因 NullPointerException 引起的异常而崩溃,代码如下:

private static KafkaStreams getKafkaStreams() {
    StreamsConfig streamsConfig = new StreamsConfig(getProperties());
    KStreamBuilder builder = new KStreamBuilder();

    KTable<String, Verkaeufer> umsatzTable = builder.table(Serdes.String(), EventstreamSerde.Verkaeufer(), CommonUtilsConstants.TOPIC_VERKAEUFER_STAMMDATEN);
    KStream<String, String> verkaeuferStream = builder.stream(CommonUtilsConstants.TOPIC_ANZAHL_UMSATZ_PER_VERKAEUFER);

    KStream<String, String> tuttiStream = verkaeuferStream.leftJoin(umsatzTable,
            (tutti, verkaeufer) -> ("Vorname=" + verkaeufer.getVorname().toString() +",Nachname=" +verkaeufer.getNachname().toString() +"," +tutti.toString()), Serdes.String(), Serdes.String());

    tuttiStream.to(Serdes.String(), Serdes.String(), CommonUtilsConstants.TOPIC_TUTTI);

    return new KafkaStreams(builder, streamsConfig);
}

StreamsConfig 看起来像这样:

private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CommonUtilsConstants.BOOTSTRAP_SERVER_CONFIGURATION);
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, CommonUtilsConstants.GID_TUTTI);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "1000");

    return props;
}

完整堆栈跟踪:

22:19:36.550 [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] Failed to commit StreamTask 0_0 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KTABLE-SOURCE-STATE-STORE-0000000000
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:262)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:190)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:282)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:264)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:253)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:815)
at org.apache.kafka.streams.processor.internals.StreamThread.access$2800(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:797)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:789)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:778)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: java.lang.NullPointerException: null
at java.lang.String.<init>(String.java:143)
at ch.wesr.eventstream.commonutils.serde.GsonDeserializer.deserialize(GsonDeserializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:90)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:145)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:103)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:97)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:260)
... 14 common frames omitted

更新:

这就是 GsonDeserialize 的样子

public class GsonDeserializer<T> implements Deserializer<T>{

    public static final String CONFIG_VALUE_CLASS = "default.value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "default.key.deserializer.class";
    private Class<T> deserializedClass;
    private Gson gson = new GsonBuilder().create();

    public GsonDeserializer() {}

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));
        try {
            if (deserializedClass == null) {
                deserializedClass = (Class<T>) Class.forName(clsName);
            }
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                            "Did you forget to specify the '%s' property ?%n",
                    configKey);
            System.out.println(e.getMessage());
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        return gson.fromJson(new String(bytes), deserializedClass);
    }

    @Override
    public void close() {}
}

最佳答案

只要缓存没有被刷新,你的反序列化器就永远不会被调用。这就是为什么它一开始不会失败,您可以通过缓存大小参数和提交间隔(我们在提交时刷新)来增加失败的时间。

查看您的 GsonDeserializer 代码,似乎 new String(bytes) 因 NPE 而失败 -- String constructor cannot take null 作为参数——你的反序列化器代码必须防止 bytes==null 并且应该直接为这种情况返回 null

关于apache-kafka-streams - 无法刷新状态存储,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46749155/

相关文章:

apache-kafka - 我应该用什么 : Kafka Stream or Kafka consumer api or Kafka connect

apache-kafka-streams - 使用 Spring 执行器(普罗米修斯)公开 kafka 流指标

java - UnsatisfiedLinkError :/tmp/snappy-1. 1.4-libsnappyjava.so 加载共享库 ld-linux-x86-64.so.2 时出错:没有这样的文件或目录

java - Kafka Stream 如何使用 KTable#Suppress 发送最终聚合?

apache-kafka - state store changelog 主题的消费者是什么

java - kafka KStream - 进行 n 秒计数的拓扑

apache-kafka - Kafka Streams 应用程序版本 0.10.2.0 存在问题

apache-kafka - 卡夫卡处理器 API : Different key for Source and StateStore?

apache-kafka - 如何将一个 Kafka 主题拆分为多个较小的 Kafka 主题?

apache-kafka-streams - 如何设置 Kafka 流创建的状态存储的保留期