java - Dataflow DoFn 中的数据存储查询在云中运行时会减慢管道速度

标签 java google-cloud-datastore google-cloud-dataflow

我正在尝试通过在 DoFn 步骤中查询数据存储来增强管道中的数据。 来自类 CustomClass 的对象的字段用于对数据存储区表执行查询,返回的值用于增强对象。

代码如下所示:

public class EnhanceWithDataStore extends DoFn<CustomClass, CustomClass> {

private static Datastore datastore = DatastoreOptions.defaultInstance().service();
private static KeyFactory articleKeyFactory = datastore.newKeyFactory().kind("article");

@Override
public void processElement(ProcessContext c) throws Exception {

    CustomClass event = c.element();

    Entity article = datastore.get(articleKeyFactory.newKey(event.getArticleId()));

    String articleName = "";
    try{
        articleName = article.getString("articleName");         
    } catch(Exception e) {}

    CustomClass enhanced = new CustomClass(event);
    enhanced.setArticleName(articleName);

    c.output(enhanced);
}

当它在本地运行时,速度很快,但当它在云端运行时,这一步会显着降低管道速度。这是什么原因造成的?有没有解决方法或更好的方法来做到这一点?

管道的图片可以在这里找到(最后一步是增强步骤): pipeline architecture

最佳答案

您在这里所做的是在您的输入 PCollection<CustomClass> 之间进行连接以及 Datastore 中的增强功能。

对于您的 PCollection 的每个分区对 Datastore 的调用将是单线程的,因此会产生大量延迟。我希望这在 DirectPipelineRunner 中会很慢和 InProcessPipelineRunner以及。通过自动缩放和动态工作重新平衡,您应该在数据流服务上运行时看到并行性,除非您的结构导致我们对其优化不佳,因此您可以尝试增加 --maxNumWorkers .但是您仍然无法从批量操作中受益。

最好在您的管道中使用 DatastoreIO.readFrom(...) 表达此连接其次是 CoGroupByKey转换。这样,Dataflow 将对所有增强功能进行批量并行读取,并使用高效的 GroupByKey。使他们与事件保持一致的机器。

// Here are the two collections you want to join
PCollection<CustomClass> events = ...;
PCollection<Entity> articles = DatastoreIO.readFrom(...);

// Key them both by the common id
PCollection<KV<Long, CustomClass>> keyedEvents =
    events.apply(WithKeys.of(event -> event.getArticleId()))

PCollection<KV<Long, Entity>> =
    articles.apply(WithKeys.of(article -> article.getKey().getId())

// Set up the join by giving tags to each collection
TupleTag<CustomClass> eventTag = new TupleTag<CustomClass>() {};
TupleTag<Entity> articleTag = new TupleTag<Entity>() {};
KeyedPCollectionTuple<Long> coGbkInput =
    KeyedPCollectionTuple
        .of(eventTag, keyedEvents)
        .and(articleTag, keyedArticles);

PCollection<CustomClass> enhancedEvents = coGbkInput
    .apply(CoGroupByKey.create())
    .apply(MapElements.via(CoGbkResult joinResult -> {
      for (CustomClass event : joinResult.getAll(eventTag)) {
        String articleName;
        try {
          articleName = joinResult.getOnly(articleTag).getString("articleName");
        } catch(Exception e) {
          articleName = "";
        }
        CustomClass enhanced = new CustomClass(event);
        enhanced.setArticleName(articleName);
        return enhanced;
      }
    });

另一种可能性,如果只有很少的文章足以在内存中存储查找,则使用 DatastoreIO.readFrom(...)然后通过 View.asMap() 将它们全部读取为 map 端输入并在本地表中查找它们。

// Here are the two collections you want to join
PCollection<CustomClass> events = ...;
PCollection<Entity> articles = DatastoreIO.readFrom(...);

// Key the articles and create a map view
PCollectionView<Map<Long, Entity>> = articleView
    .apply(WithKeys.of(article -> article.getKey().getId())
    .apply(View.asMap());

// Do a lookup join by side input to a ParDo
PCollection<CustomClass> enhanced = events
    .apply(ParDo.withSideInputs(articles).of(new DoFn<CustomClass, CustomClass>() {
      @Override
      public void processElement(ProcessContext c) {
        Map<Long, Entity> articleLookup = c.sideInput(articleView);
        String articleName;
        try {
          articleName =
              articleLookup.get(event.getArticleId()).getString("articleName");
        } catch(Exception e) {
          articleName = "";
        }
        CustomClass enhanced = new CustomClass(event);
        enhanced.setArticleName(articleName);
        return enhanced;
      }
    });

根据您的数据,这些中的任何一个都可能是更好的选择。

关于java - Dataflow DoFn 中的数据存储查询在云中运行时会减慢管道速度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40049621/

相关文章:

java - 将时间字符串解析为日历时出错

java - @KafkaListener不接收记录

sql - 如何将数据从谷歌数据存储实体下载到本地?

google-cloud-platform - 为什么使用 Dataflow 写入 Bigquery 非常慢?

java - 无法设置无人机: NoClassDefFoundError: com/opera/core/systems/OperaDriver

具有 Cloud SQL 或数据存储的 Android App Engine

google-app-engine - Google App Engine 数据存储数据隐私

python - 将 PCollection 分配回全局窗口

google-cloud-dataflow - 如何解决 apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum 类上的酸洗错误?

java - 托管 bean 属性值未设置为 null