我有一个管道,可将 JSON 消息从 PubSub(未绑定(bind) PCollection)流式传输到 Google Cloud Storage。每个文件应包含多个 JSON 对象,每行一个。
我想创建另一个管道,该管道应从此 GCS 存储桶读取所有 JSON 对象以进行进一步流处理。最重要的是,第二个管道应该作为流而不是批处理工作。意味着我希望它“监听”存储桶并处理写入其中的每个 JSON 对象。 未绑定(bind) PCollection。
有什么办法可以实现这种行为吗?
谢谢
最佳答案
流处理仅适用于 PubSub 数据源。但别担心,您可以实现您的管道。
- 创建a notification subscription on the bucket to publish event on PubSub
- 创建一个监听 PubSub 消息的管道。
- 当消息到达时,读取它并获取文件 URI
- 使用存储 API 读取文件并将每一行注入(inject)管道中
- 继续处理每个解码行的管道。
关于java - 谷歌数据流: Read unbound PCollection from Google Cloud Storage,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60432557/