python - apache_beam.transforms.util.Reshuffle() 不适用于 GCP 数据流

标签 python google-cloud-platform google-cloud-dataflow apache-beam

我已经通过 pip install --upgrade apache_beam[gcp] 升级到最新的 apache_beam[gcp] 包。但是,我注意到 Reshuffle()没有出现在 [gcp] 发行版中。这是否意味着我将无法在任何数据流管道中使用 Reshuffle()?有没有办法解决?还是 pip 包可能不是最新的,如果 Reshuffle() 在 github 上的 master 中,那么它将在数据流上可用?

基于对此 question 的回复我正在尝试从 BigQuery 读取数据,然后在将数据写入 GCP 存储桶中的 CSV 之前随机化数据。我注意到我用来训练 GCMLE 模型的分片 .csv 并不是真正随机的。在 tensorflow 中,我可以随机化批处理,但这只会随机化队列中构建的每个文件中的行,我的问题是当前正在生成的文件在某种程度上存在偏差。如果在数据流中写入 CSV 之前对其他随机播放方法有任何建议,我们将不胜感激。

最佳答案

一种方法是自己重新创建随机播放。

import random

shuffled_data = (unshuffled_pcoll
        | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
        | 'GroupByKey' >> GroupByKey()
        | 'RemoveRandomKeys' >> FlatMap(lambda t: t[1]))

我剩下的问题是我是否需要担心 code 中的窗口或 ExpandIterable 部分

关于python - apache_beam.transforms.util.Reshuffle() 不适用于 GCP 数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48585959/

相关文章:

python - 追加列对 Pandas 来说很麻烦

python - 如何在Python中使用OpenCV在检测部分绘制矩形?

sql - Google BigQuery - 使用 WITH 和 RAND() 的错误

java - Google Cloud Dataflow TextIO 到 BigQueryIO 作业无法启动

python - 用于侧面输入的高效 ParDo 设置或 start_bundle

c# - IronPython 中的属性 : eternal loop

python - 有什么方法可以更有效地检查单词中的所有字母是否都在列表中?

kubernetes - 使用 Autoscaler 在 GCP 上进行不可调度的 Kubernetes pod

google-cloud-platform - AWS Client VPN Endpoint 的 GCP 等价物是什么

java - 是否可以使用 Java 8 运行我的数据流管道代码?