python - 使用数据流在 Google Cloud Platform 中加入两个 json

标签 python json google-cloud-dataflow apache-beam

我想从两个不同的 JSON 文件中仅找出女性员工,并仅选择我们感兴趣的字段并将输出写入另一个 JSON。

此外,我正在尝试使用 Dataflow 在 Google 的云平台中实现它。有人可以提供任何可以实现以获得结果的示例Java代码。

员工 JSON

{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"}
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"}

部门JSON
{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"}
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"}
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"}

预期的输出 JSON 文件应该类似于
{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"}

最佳答案

您可以使用 CoGroupByKey 执行此操作(将使用 shuffle 的地方),或使用侧输入,如果您的部门集合明显较小。

我将为您提供 Python 代码,但您可以在 Java 中使用相同的管道。

通过侧面输入,您将:

  • 将您的部门 PCollection 转换为可映射的字典
    dept_id 到部门 JSON 字典。
  • 然后你拿
    员工 PCollection 作为主要输入,您可以在其中使用 dept_id
    获取部门 PCollection 中每个部门的 JSON。

  • 像这样:
    departments = (p | LoadDepts()
                     | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))
    
    deps_si = beam.pvalue.AsDict(departments)
    
    employees = (p | LoadEmps())
    
    def join_emp_dept(employee, dept_dict):
      return employee.update(dept_dict[employee['dept_id']])
    
    joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)
    

    CoGroupByKey ,您可以使用 dept_id 作为键对两个集合进行分组。这将导致键值对的 PCollection,其中键是 dept_id,值是部门和该部门的员工的两个可迭代对象。
    departments = (p | LoadDepts()
                   | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))
    
    employees = (p | LoadEmps()
                   | 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))
    
    def join_lists((k, v)):
      itertools.product(v['employees'], v['departments'])
    
    joined_dicts = (
        {'employees': employees, 'departments': departments} 
        | beam.CoGroupByKey()    
        | beam.FlatMap(join_lists)
        | 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
        | 'filterfields'>> beam.Map(filter_fields)
    )
    

    关于python - 使用数据流在 Google Cloud Platform 中加入两个 json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45287530/

    相关文章:

    javascript - Google map 上的 JSON feed 没有显示任何内容

    python - 来自数据流的 BigQuery 流式传输无提示地失败

    java - 使用前缀键删除行范围

    python - 如何使用 Python 对 SQL IN 子句进行字符串格式化

    Python 继承 : TypeError: object. __init__() 不带参数

    php - 自动完成不返回搜索到的值,但它返回所有值(在 PHP、jQuery 中)

    java - Android 上的 Facebook OAuth

    google-bigquery - Apache 光束 : Transform an objects having a list of objects to multiple TableRows to write to BigQuery

    python - 如何在不更改内置 sys 模块的情况下执行脚本?

    python - 类型错误 : insert_or_replace_entity() takes at most 4 arguments (5 given)