目前,我有一个 Flink Cluster,想要通过一种 Pattern 来消费 Kafka Topic,通过这种方式,我们不需要维护一个硬编码的 Kafka Topic 列表。
import java.util.regex.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
...
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
...
FlinkKafkaConsumer010<KafkaMessage> kafkaConsumer = new FlinkKafkaConsumer010<>(
topicPattern, deserializerClazz.newInstance(), kafkaConsumerProps);
DataStream<KafkaMessage> input = env.addSource(kafkaConsumer);
我只是想知道,通过上述方式,在处理过程中如何才能知道真实的Kafka主题名称? 谢谢。
--更新-- 我之所以需要知道主题信息,是因为我们需要这个主题名称作为参数,在接下来的 Flink Sink 部分中使用。
最佳答案
您可以实现自己的自定义 KafkaDeserializationSchema,如下所示:
public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
@Override
public boolean isEndOfStream(Tuple2<String, String> nextElement) {
return false;
}
@Override
public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
}
@Override
public TypeInformation<Tuple2<String, String>> getProducedType() {
return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
}
}
使用自定义的KafkaDeserializationSchema,您可以创建元素包含主题信息的DataStream。在我的演示案例中,元素类型是 Tuple2<String, String>
,这样就可以通过 Tuple2#f0
访问主题名称.
FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);
DataStream<Tuple2<String, String>> input = env.addSource(kafkaConsumer);
input.process(new ProcessFunction<Tuple2<String,String>, String>() {
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
String topicName = value.f0;
// your processing logic here.
out.collect(value.f1);
}
});
关于java - 如何在Flink Kafka Consumer中动态获取处理kafka主题名称?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57266072/