python - 如何在 Apache Beam 中计算标准偏差

标签 python apache-beam

我是 Apache Beam 的新手,我想计算大型数据集的均值和标准差。

给定一个“A,B”形式的 .csv 文件,其中 A、B 是整数,这基本上就是我所拥有的。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.textio import ReadFromText

class Split(beam.DoFn):
    def process(self, element):
        A, B = element.split(',')
        return [('A', A), ('B', B)]

with beam.Pipeline(options=PipelineOptions()) as p:
     # parse the rows
     rows = (p
             | ReadFromText('data.csv')
             | beam.ParDo(Split()))

     # calculate the mean
     avgs = (rows
             | beam.CombinePerKey(
                 beam.combiners.MeanCombineFn()))

     # calculate the stdv per key
     # ???

     std >> beam.io.WriteToText('std.out')

我想做这样的事情:

class SquaredDiff(beam.DoFn):
    def process(self, element):
        A = element[0][1]
        B = element[1][1]
        return [('A', A - avgs[0]), ('B', B - avgs[1])]

stdv = (rows
        | beam.ParDo(SquaredDiff())
        | beam.CombinePerKey(
            beam.combiners.MeanCombineFn()))

之类的,但我不知道怎么做。

最佳答案

编写自己的组合器。这将起作用:

class MeanStddev(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0.0, 0) # x, x^2, count

  def add_input(self, sum_count, input):
    (sum, sumsq, count) = sum_count
    return sum + input, sumsq + input*input, count + 1

  def merge_accumulators(self, accumulators):
    sums, sumsqs, counts = zip(*accumulators)
    return sum(sums), sum(sumsqs), sum(counts)

  def extract_output(self, sum_count):
    (sum, sumsq, count) = sum_count
    if count:
      mean = sum / count
      variance = (sumsq / count) - mean*mean
      # -ve value could happen due to rounding
      stddev = np.sqrt(variance) if variance > 0 else 0
      return {
        'mean': mean,
        'variance': variance,
        'stddev': stddev,
        'count': count
      }
    else:
      return {
        'mean': float('NaN'),
        'variance': float('NaN'),
        'stddev': float('NaN'),
        'count': 0
      }

这会将方差计算为 E(x^2) - E(x)*E(x),因此您只需传递一次数据。这就是您使用上述组合器的方式:

[1.3, 3.0, 4.2] | beam.CombineGlobally(MeanStddev())

关于python - 如何在 Apache Beam 中计算标准偏差,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51830947/

相关文章:

python - 在谷歌云数据流中刷新访问 token 时,GRPC 状态运行时异常

google-cloud-dataflow - 谷歌数据流 : attaching filename to the message

python - 当我使用 toastnotification 执行 python exe 时,显示未找到 win10toast 分发

python - 错误 : Key already exists

python - 如何使用 'add_value_provider_argument'初始化运行时参数?

python - 应用 TensorFlow Transform 来转换/缩放生产中的特征

python - 为什么使用 "--requirements_file"将依赖项上传到 GCS?

Python模块 "twill"——变量赋值期间的HTML泛滥

python - 使用 pandas dataframe 绘制多列图表

python - 使用curl将文件上传到django服务器