apache-kafka - 弗林克 : join file with kafka stream

标签 apache-kafka apache-flink

我有一个我真的无法解决的问题。 所以我有一个 kafka 流,其中包含一些这样的数据:

{"adId":"9001", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"}

我想用另一个值“bookingId”替换“adId”。 这个值位于一个 csv 文件中,但我真的不知道如何让它工作。

这是我的映射 csv 文件:

9001;8
9002;10

所以我的输出理想情况下应该是这样的

{"bookingId":"8", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"}

此文件至少每小时刷新一次,因此它应该获取对其的更改。

我目前有这段代码对我不起作用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // create a checkpoint every 30 seconds
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

DataStream<String> adToBookingMapping = env.readTextFile(parameters.get("adToBookingMapping"));

DataStream<Tuple2<Integer,Integer>> input = adToBookingMapping.flatMap(new Tokenizer());

//Kafka Consumer
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", parameters.get("bootstrap.servers"));
properties.setProperty("group.id", parameters.get("group.id"));

FlinkKafkaConsumer010<ObjectNode> consumer = new FlinkKafkaConsumer010<>(parameters.get("inbound_topic"), new JSONDeserializationSchema(), properties);

consumer.setStartFromGroupOffsets();

consumer.setCommitOffsetsOnCheckpoints(true);

DataStream<ObjectNode> logs = env.addSource(consumer);

DataStream<Tuple4<Integer,String,Integer,Float>> parsed = logs.flatMap(new Parser());

// output -> bookingId, action, impressions, sum
DataStream<Tuple4<Integer, String,Integer,Float>> joined = runWindowJoin(parsed, input, 3);


public static DataStream<Tuple4<Integer, String, Integer, Float>> runWindowJoin(DataStream<Tuple4<Integer, String, Integer, Float>> parsed,
      DataStream<Tuple2<Integer, Integer>> input,long windowSize) {

  return parsed.join(input)
          .where(new ParsedKey())
          .equalTo(new InputKey())
          .window(TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)))
          //.window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
          .apply(new JoinFunction<Tuple4<Integer, String, Integer, Float>, Tuple2<Integer, Integer>, Tuple4<Integer, String, Integer, Float>>() {

              private static final long serialVersionUID = 4874139139788915879L;

              @Override
              public Tuple4<Integer, String, Integer, Float> join(
                              Tuple4<Integer, String, Integer, Float> first,
                              Tuple2<Integer, Integer> second) {
                  return new Tuple4<Integer, String, Integer, Float>(second.f1, first.f1, first.f2, first.f3);
              }
          });
}

代码只运行一次然后停止,所以它不会使用 csv 文件转换 kafka 中的新条目。关于如何使用我的 csv 文件中的最新值处理来自 Kafka 的流有什么想法吗?

亲切的问候,

黑暗知识

最佳答案

您的目标似乎是将动态数据与缓慢变化的目录(即边输入)结合起来。我认为 join 操作在这里没有用,因为它不会跨窗口存储目录条目。此外,文本文件是一个有界输入,其行被读取一次。

考虑使用 connect 创建连接流,并将目录数据存储为托管状态以执行查找。运算符的并行度需要为 1。

您可能会通过研究“辅助输入”找到更好的解决方案,看看人们今天使用的解决方案。参见 FLIP-17Dean Wampler's talk at Flink Forward .

关于apache-kafka - 弗林克 : join file with kafka stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45210753/

相关文章:

docker - 如何在Mesos群集中运行flink jar文件

kubernetes - K8S 上的 Flink : how do I provide Flink configuration to the cluster?

apache-flink - apache flink 窗口顺序

apache-kafka - 如何确保日志在 Kafka 中永久保留?

java - Kafka设置从主题读取的最大消息数

apache-kafka - Apache Kafka 中的分区领导者是什么?

java - 如何在模式级别监听 kafka 事件

elasticsearch - 从kafka导入数据到Elasticsearch时如何获取导入进度和错误日志?

apache-flink - 弗林克 : Shuffle with Parallelism = 1

java - 将 Flink scala 翻译成 java