apache-beam - 科学奥 : groupByKey doesn't work when using Pub/Sub as collection source

标签 apache-beam spotify-scio

我更改了 WindowsWordCount example 的来源程序从文本文件到云 Pub/Sub,如下所示。我将莎士比亚文件的数据发布到 Pub/Sub,该数据确实被正确获取,但 .groupByKey 之后的任何转换似乎都不起作用。

sc.pubsubSubscription[String](psSubscription)
  .withFixedWindows(windowSize) // apply windowing logic
  .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
  .countByValue
  .withWindow[IntervalWindow]
  .swap
  .groupByKey
  .map {
    s =>
      println("\n\n\n\n\n\n\n This never prints \n\n\n\n\n")
      println(s)
  }

最佳答案

将输入从文本文件更改为 PubSub PCollection“无界”。按键分组需要定义聚合触发器,否则分组器将永远等待。这里的数据流文档中提到了这一点: https://cloud.google.com/dataflow/model/group-by-key

Note: Either non-global Windowing or an aggregation trigger is required in order to perform a GroupByKey on an unbounded PCollection. This is because a bounded GroupByKey must wait for all the data with a certain key to be collected; but with an unbounded collection, the data is unlimited. Windowing and/or Triggers allow grouping to operate on logical, finite bundles of data within the unbounded data stream.

If you apply GroupByKey to an unbounded PCollection without setting either a non-global windowing strategy, a trigger strategy, or both, Dataflow will generate an IllegalStateException error when your pipeline is constructed.

不幸的是,Apache Beam 的 Python SDK 似乎还不支持触发器,所以我不确定 python 中的解决方案是什么。

(参见 https://beam.apache.org/documentation/programming-guide/#triggers )

关于apache-beam - 科学奥 : groupByKey doesn't work when using Pub/Sub as collection source,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44628370/

相关文章:

jdbc - 将数据从 CloudSql 流式传输到 Dataflow

java - 如何创建每隔一段时间进入 Apache Beam 管道的虚假数据流?

java - 构建Apache Beam Cassandra JAR时没有重新打包的依赖项

python - Apache Beam DataFlow 运行程序抛出设置错误

java - 将 Apache Beam 的 PCollection 对象收集到驱动程序内存中

python - 数据流错误 : 'Clients have non-trivial state that is local and unpickleable'

将批处理数据与存储在 BigTable 中的数据合并

scala - 使用 Dataflow 将 PubSub 流写入 Cloud Storage 时出错

scala - Scio/apache 光束 java.lang.IllegalArgumentException : unable to serialize method