由于我们无法直接从 Json 文件读取,因此我使用 .txt 文件。 看起来更多的元素用“,”分隔。
[
{
"Item_Identifier": "FDW58",
"Outlet_Size": "Medium"
},
{
"Item_Identifier": "FDW14",
"Outlet_Size": "Small"
},
]
我想计算元素的数量,这里我会得到2。 问题是我无法将文本分成用逗号“,”分隔的元素。 即使我将其转换为 json 格式,我也会单独获取每一行。
lines = p | 'receive_data' >> beam.io.ReadFromText(
known_args.input)\
| 'jsondumps' >> beam.Map(lambda x: json.dumps(x))\
| 'jsonloads' >> beam.Map(lambda x: json.loads(x))\
| 'print' >> beam.ParDo(PrintFn()) \
最佳答案
我不认为这是一种安全的方法。我没有使用python sdk(我使用java),但是java端的io.TextIO非常清楚,它会发出一个PCollection,其中每个元素都是来自源文件的一行输入。分层数据格式(json、xml 等)不能修改为以这种方式拆分。
如果您的文件格式与您包含的 json 一样格式良好且非嵌套,您可以摆脱:
- 按行读取文件(正如我相信您正在做的那样)
- 仅过滤包含
}
的行 - 计算生成的 pcollection 大小
为了更广泛地与 json 集成,我们采取了不同的方法:
- 从字符串 PCollection 开始,其中每个值都是文件的路径
- 使用 native 库访问文件并以流式方式解析它(我们使用 scala,它有一些可用的流式 json 解析库)
- 或者,使用 Beam 的 API 从
MatchResult
获取ReadableFile
实例,并通过该实例访问文件
- 或者,使用 Beam 的 API 从
我的理解是,并非所有文件格式都适合分布式处理器。例如,Gzip 不能轻易“拆分”或分块。与 JSON 相同。 CSV 存在一个问题,即除非您也有方便的开头行,否则这些值都是无意义的。
关于python - 将 .txt 文件拆分为多个元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58138294/