scala - 使用 Dataflow 将 PubSub 流写入 Cloud Storage 时出错

标签 scala google-cloud-dataflow google-cloud-pubsub apache-beam spotify-scio

使用 SCIO来自 spotifyDataflow 写一份工作, 以下 2 个例子 e.g1e.g2写一个 PubSub流到 GCS ,但以下代码出现以下错误

错误

Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection 

代码
object StreamingPubSub {
  def main(cmdlineArgs: Array[String]): Unit = {
// set up example wiring
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs)
val dataflowUtils = new DataflowExampleUtils(opts)
dataflowUtils.setup()

val sc = ScioContext(opts)


sc.pubsubTopic(opts.getPubsubTopic)
.timestampBy {
    _ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong)
  }
.withFixedWindows((Duration.standardHours(1)))
.groupBy(_ => Unit)
.toWindowed
.toSCollection
.saveAsTextFile(args("output"))


val result = sc.close()

// CTRL-C to cancel the streaming pipeline
    dataflowUtils.waitToFinish(result.internal)
  }
}

我可能将窗口概念与 Bounded PCollection 混淆,有没有办法实现这一点,或者我是否需要应用一些转换来实现这一点,任何人都可以提供帮助

最佳答案

我相信SCIO的saveAsTextFile下面使用 Dataflow 的 Write转换,仅支持有界 PCollections。 Dataflow 尚未提供直接 API 来将无界 PCollection 写入 Google Cloud Storage,尽管这是我们正在研究的问题。

要将无界 PCollection 保留在某处,请考虑例如 BigQuery、Datastore 或 Bigtable。例如,在 SCIO 的 API 中,您可以使用 saveAsBigQuery .

关于scala - 使用 Dataflow 将 PubSub 流写入 Cloud Storage 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39865774/

相关文章:

scala - Thunk 与函数重载

python - 在一项 Dataflow 作业中写入和读取 BigQuery

java - 分析在 Google Dataflow 上运行的 Java 应用程序

apache-spark - 数据流 SparkPipelineRunner - 有可用的示例吗?

google-cloud-platform - 将推送通知从 Gmail API 发送到本地主机端点

azure - 如何将Azure事件中心(kafka接口(interface))的事件集成到google cloud pub/sub

scala - 在仅支持 Spark 1.6 的集群上使用 Spark 2.0 运行 Fat Jar

scala - 如何解释 SBT 错误消息

java - 使用java工具/scala main函数运行scala程序

google-app-engine - Google PubSub getMessageID = null 因为标签错误?