String prefix = "B";
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-word-pattern");
StreamsBuilder streamsBuilder = new StreamsBuilder();
//source
KStream<String, String> stream = streamsBuilder.stream(SOURCE_TEST_TOPIC);
//word processor
KStream<String, String> wordProcessor = stream.flatMapValues(s -> Arrays.asList(s.split(",")));
//match
KStream<String, String> matchProcessor =
wordProcessor.filter((key, value) -> value.toUpperCase().startsWith(prefix));
matchProcessor.to(WORD_TOPIC);
Topology topology = streamsBuilder.build();
try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {
System.err.println("Stream is starting...");
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("Stream is closing...");
kafkaStreams.close();
}));
}
当我运行此流时,引发了此异常:
Exception in thread "main" java.lang.NoSuchFieldError: TRACE at org.apache.kafka.streams.StreamsConfig.(StreamsConfig.java:766) at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:693) at kafkacustom.streams.KafkaStreamsExample.main(KafkaStreamsExample.java:42)
如何解决这个问题
最佳答案
这是由于库不兼容造成的,请检查您的 org.apache.kafka:kafka-clients
和 org.apache.kafka:kafka-streams
版本。
在我的具体情况下org.apache.kafka:kafka-clients:6.0.0-ccs
(汇合库)已从中删除了此
传感器内部类,我使用开源版本的 TRACE
枚举值RecordingLevelorg.apache.kafka:kafka-streams:3.1.1
。所以我刚刚更新到汇合版本 6.0.0-css
并且可以工作
要进一步调查,请单击类 StreamsConfig
的控制台日志跟踪,应位于第 794 行,然后检查 Sensor
类是否有其内部类RecordingLevel
具有 TRACE
枚举值
对于“官方”repo ,它仍然重新获取这个枚举值
关于apache-kafka - kafka : Exception in thread "main" java. lang.NoSuchFieldError:跟踪,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70681056/