java - 如何在Flink Kafka Consumer中动态获取处理kafka主题名称?

标签 java apache-kafka apache-flink flink-streaming

目前,我有一个 Flink Cluster,想要通过一种 Pattern 来消费 Kafka Topic,通过这种方式,我们不需要维护一个硬编码的 Kafka Topic 列表。

import java.util.regex.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
...
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
...
FlinkKafkaConsumer010<KafkaMessage> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, deserializerClazz.newInstance(), kafkaConsumerProps);
DataStream<KafkaMessage> input = env.addSource(kafkaConsumer);

我只是想知道,通过上述方式,在处理过程中如何才能知道真实的Kafka主题名称? 谢谢。

--更新-- 我之所以需要知道主题信息,是因为我们需要这个主题名称作为参数,在接下来的 Flink Sink 部分中使用。

最佳答案

您可以实现自己的自定义 KafkaDeserializationSchema,如下所示:

  public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
    @Override
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }

    @Override
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
  }

使用自定义的KafkaDeserializationSchema,您可以创建元素包含主题信息的DataStream。在我的演示案例中,元素类型是 Tuple2<String, String> ,这样就可以通过 Tuple2#f0 访问主题名称.

FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);
DataStream<Tuple2<String, String>> input = env.addSource(kafkaConsumer);

input.process(new ProcessFunction<Tuple2<String,String>, String>() {
            @Override
            public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                String topicName = value.f0;
                // your processing logic here.
                out.collect(value.f1);
            }
        });

关于java - 如何在Flink Kafka Consumer中动态获取处理kafka主题名称?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57266072/

相关文章:

hadoop - s3n/s3a如何管理文件?

scala - Scala 案例类中 init 方法的 java.lang.NoSuchMethodException

java - Windows 应用商店 - 使用 Java 验证收据

java - 卡夫卡再平衡。重复处理问题

java - 如何创建文件并将其放入与 Maven 的 war 中?

spring - 使用 spring-kafka-test 无法启动测试(NoClassDefFoundError)

apache-flink - 弗林克 : Flink Shell throws NullPointerException

apache-kafka - 如何在 Kafka 0.10.1.0 中使用 Flink?

Java Spring Data @EnabledJPARepositories 问题

java - 从 HashMap 获取方法 get() 返回