我想从两个不同的 JSON 文件中仅找出女性员工,并仅选择我们感兴趣的字段并将输出写入另一个 JSON。
此外,我正在尝试使用 Dataflow 在 Google 的云平台中实现它。有人可以提供任何可以实现以获得结果的示例Java代码。
员工 JSON
{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"}
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"}
部门JSON
{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"}
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"}
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"}
预期的输出 JSON 文件应该类似于
{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"}
最佳答案
您可以使用 CoGroupByKey
执行此操作(将使用 shuffle 的地方),或使用侧输入,如果您的部门集合明显较小。
我将为您提供 Python 代码,但您可以在 Java 中使用相同的管道。
通过侧面输入,您将:
dept_id 到部门 JSON 字典。
员工 PCollection 作为主要输入,您可以在其中使用 dept_id
获取部门 PCollection 中每个部门的 JSON。
像这样:
departments = (p | LoadDepts()
| 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))
deps_si = beam.pvalue.AsDict(departments)
employees = (p | LoadEmps())
def join_emp_dept(employee, dept_dict):
return employee.update(dept_dict[employee['dept_id']])
joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)
与
CoGroupByKey
,您可以使用 dept_id 作为键对两个集合进行分组。这将导致键值对的 PCollection,其中键是 dept_id,值是部门和该部门的员工的两个可迭代对象。departments = (p | LoadDepts()
| 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))
employees = (p | LoadEmps()
| 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))
def join_lists((k, v)):
itertools.product(v['employees'], v['departments'])
joined_dicts = (
{'employees': employees, 'departments': departments}
| beam.CoGroupByKey()
| beam.FlatMap(join_lists)
| 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
| 'filterfields'>> beam.Map(filter_fields)
)
关于python - 使用数据流在 Google Cloud Platform 中加入两个 json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45287530/