TypeError: 'PCollection' object does not support indexing
上述错误是由于尝试将 Pcollection 转换为列表而导致的:
filesList = (files | beam.combiners.ToList())
lines = (p | 'read' >> beam.Create(ReadSHP().ReadSHP(filesList))
| 'map' >> beam.Map(_to_dictionary))
并且:
def ReadSHP(self, filesList):
"""
"""
sf = shp.Reader(shp=filesList[1], dbf=filesList[2])
如何解决这个问题?感谢您的帮助。
最佳答案
一般来说,您不能将 PCollection
转换为列表。
PCollection
是一个项目的集合,可能是无界且无序的。 Beam 允许您将转换应用于 PCollection
。将 PTransform
应用于 PCollection
会产生另一个 PCollection
。并且转换的应用过程可能分布在一组机器上。所以在一般情况下不可能将这样的东西转换成本地内存中的元素集合。
Combiners 只是 PTransforms
的一个特殊类。他们所做的是将他们看到的所有元素累加起来,对这些元素应用一些组合逻辑,然后输出组合的结果。例如,组合器可以查看传入的元素,将它们相加,然后将总和作为结果输出。这种组合器将元素的 PCollection
转换为这些元素的总和的 PCollection
。
beam.combiners.ToList
只是应用于 PCollection
的另一个转换,可能在一组工作机器上,并产生另一个 PCollection
。但它在产生输出元素之前并没有真正进行任何复杂的组合,它只是将所有可见元素累积到一个列表中,然后输出可见元素列表。因此,它获取作为键值对的元素(在多台机器上),将它们放入列表中,然后输出这些列表。
缺少的是从可能的多台机器中获取这些列表并在需要时将它们加载到本地程序中的逻辑。该问题无法(如果有的话)以通用方式(在所有运行器、所有可能的 IO 和管道结构之间)轻松解决。
解决方法之一是向管道添加另一个步骤,将组合输出(例如总和或列表)写入公共(public)存储,例如某个数据库中的表或文件。然后当管道完成时,您的程序可以从该位置加载管道执行的结果。
有关详细信息,请参阅文档:
关于python - 无法将 beam python pcollection 转换为列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53499640/