java - 哪种方法是读取缓慢变化查找和丰富流输入集合的最佳方法?

标签 java parallel-processing apache-flink apache-beam flink-streaming

我正在使用 Apache Beam,其流媒体集合为 1.5GB。 我的查找表是 JDBCio mysql 响应。

当我在没有侧面输入的情况下运行管道时,我的工作将在大约 2 分钟内完成。当我使用侧面输入运行我的工作时,我的工作永远不会完成,卡住并死亡。

这是我用来存储查找的代码(约 1M 条记录)

  PCollectionView<Map<String,String>> sideData = pipeline.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
       "com.mysql.jdbc.Driver", "jdbc:mysql://ip")
      .withUsername("username")
      .withPassword("password"))
      .withQuery("select a_number from cell")
      .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
      .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
      public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
        return KV.of(resultSet.getString(1), resultSet.getString(1));
      }
})).apply(View.asMap());

这是我的流媒体 Collection 的代码

pipeline
.apply("ReadMyFile", TextIO.read().from("/home/data/**")
.watchForNewFiles(Duration.standardSeconds(60),  Watch.Growth.<String>never()))
.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))

这是我的 parDo 的代码,用于迭代每个事件行(10M 记录)

  .apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<String,Integer> i = c.element();
    String sideInputData = c.sideInput(sideData).get(i.getKey());
    if (sideInputData == null) {
      c.output(i);
    } 
  }
 }).withSideInputs(sideData));

我正在使用 flink 集群,但使用直接运行器输出相同。

集群:

2个CPU 6核 24GB内存

我做错了什么? I've followed this

最佳答案

解决方案是创建一个“缓存”MAP。

sideInput 仅触发一次,然后我将其缓存到 map 等效结构中。

因此,我避免为每个 processElement 执行 sideInput。

.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
   if (isFirstTime) {
        myList = c.sideInput(sideData);
    }
    isFirstTime = false;
    boolean result = myList.containsKey(c.element().getKey());         
    if (result == false) {
      c.output(i);
    } 
  }
 }).withSideInputs(sideData));

关于java - 哪种方法是读取缓慢变化查找和丰富流输入集合的最佳方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59087483/

相关文章:

java - 奇怪的 LiveData 行为?

teamcity - 如何在 TeamCity 中重新组合构建?

python - 如何在 python 中加快这个图像蒙版创建过程?

apache-flink - 在 Apache flink 中的节点之间共享数据集的最佳方式是什么?

apache-flink - Flink 窗口和状态维护

twitter - Apache Flink与Twitter Heron?

java - 测试用例的生成

java - 使用 Java Config 时的 Spring Batch 表前缀

java - 在 Websphere App Server 和 Websphere MQ 之间配置 SSL

c++ - SIMD XOR 运算不如 Integer XOR 有效?