我需要对一些数据执行一个非常简单的转换(从 JSON 中提取一个字符串),然后将它写入 PubSub - 我正在尝试使用自定义 python Dataflow 作业来执行此操作。
我写了一个成功写回 Cloud Storage 的作业,但即使是最简单的写入 PubSub(无转换)的尝试也会导致错误:JOB_MESSAGE_ERROR: Workflow failed. Causes: Expected custom source to have non-zero number of splits.
有没有人通过 Dataflow 从 GCS 成功写入 PubSub?
任何人都可以阐明这里出了什么问题吗?
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)
output = lines #Obviously not necessary but this is where my simple extract goes
output | beam.io.WriteToPubSub(known_args.output) # This doesn't
最佳答案
当前无法实现此方案,因为当您在 Dataflow 中使用流模式时,the only source you can use is PubSub .并且您无法切换到批处理模式,因为 apache 光束 PubSub sources and sinks仅可用于流式传输(用于像 Dataflow runner 这样的远程执行)。
这就是为什么您可以在没有 WriteToPubSub 和流标志的情况下执行管道的原因。
关于python - Beam/Dataflow 自定义 Python 作业 - Cloud Storage 到 PubSub,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56868887/