我正在使用Flink v1.4.0
.
我正在利用批处理 API 进行一些 ETL,其中我有 DataSet<Employee >
其中Employee
其形式如下:
public class Employee implements Serializable {
private String name;
private double baseSalary;
private double bonus;
private double totalComp;
...
}
假设所有变量都有一个构造函数、setter 和 getter。
现在,我有许多操作被应用,我认为是按顺序的方式,根据这些方式我试图丰富 DataSet<Employee> employees
如下:
...
DataSet<String> employees = env.fromCollection(employeesList);
DataSet<Employee> initEmployees = employees.map(new InitMapFunction());
DataSet<Employee> employeesEnrichedWithSalaryData = initEmployees.map(new SalaryMapFunction(salaryEnrichmentData));
DataSet<Employee> employeesEnrichedWithBonusData = employeesEnrichedWithSalaryData.map(new BonusMapFunction(bonusEnrichmentData));
DataSet<Employee> finalEmployeesData = employeesEnrichedWithSalaryData.map(new TotalCompMapFunction());
...
假设我的包裹中的某个地方有以下 MapFunction
实现:
final class InitMapFunction implements MapFunction<String, Employee>, Serializable {
@Override
public Employee map(String name) {
Employee employee = Employee.newBuilder().build();
employee.setName(name)
return employee;
}
}
final class SalaryMapFunction implements MapFunction<Employee, Employee>, Serializable {
private Map<String, double> mapOfEmployeeVsSalary;
SalaryMapFunction(Map<String, double> mapOfEmployeeVsSalary) {
this.mapOfEmployeeVsSalary = mapOfEmployeeVsSalary;
}
@Override
public Employee map(Employee employee) {
if(mapOfEmployeeVsSalary.containsKey(employee.getName())) {
employee.setSalary(mapOfEmployeeVsSalary.get(employee.getName()))
}
return employee;
}
}
final class BonusMapFunction implements MapFunction<Employee, Employee>, Serializable {
private Map<String, double> mapOfEmployeeVsBonus;
SalaryMapFunction(Map<String, double> mapOfEmployeeVsBonus) {
this.mapOfEmployeeVsBonus = mapOfEmployeeVsBonus;
}
@Override
public Employee map(Employee employee) {
if(mapOfEmployeeVsBonus.containsKey(employee.getName())) {
employee.setBonus(mapOfEmployeeVsBonus.get(employee.getName()))
}
return employee;
}
}
final class TotalCompMapFunction implements MapFunction<Employee, Employee>, Serializable {
@Override
public Employee map(Employee employee) {
employee.setTotalComp(employee.getSalary + employee.getBonus);
return employee;
}
}
问题是:最终会DataSet
(finalEmployeesData) 包含正确的值吗?我知道我可以一次性完成这一切,但这不是这个问题的重点。我所实现的代码的功能要求以不同的步骤进行丰富。我已经发现了在处理上述数据集时特定字段未使用正确值进行丰富的情况。我理解/怀疑这是由于惰性评估造成的,并且依赖于 Flink
的优化遍历以计算最佳执行顺序(识别独立操作等)。这是正确的吗?
最后,如何保证某个操作优先于另一个操作?如果将这些操作按如下方式链接在一起,输出会发生变化吗?
DataSet<Employee> finalEmployessData = env.fromCollection(employeesList)
.map(new InitMapFunction())
.map(new SalaryMapFunction(salaryEnrichmentData))
.map(new BonusMapFunction(bonusEnrichmentData))
.map(new TotalCompMapFunction());
最佳答案
Flink 不会改变操作的顺序。如果您将程序定义为
DataSet<Y> result = input
.map(new Map1())
.map(new Map2())
那么 Map2()
将始终应用于 Map1()
的结果。
此外,无论您是在不同的对象上逐一应用这些函数,还是像上一个代码片段那样以流畅的方式应用这些函数,都没有什么区别。
您说过,您观察到某些值无法正确设置的情况。假设您运行的代码与此处所示的不完全相同,原因之一可能是 Flink 如何连接运算符以及如何在运算符之间传送记录。在某些情况下(例如映射函数的序列),Flink 通过方法调用传递记录以避免序列化成本。我们称之为函数链。链式函数被融合到单个运算符中(例如,您可以在 Web UI 中看到这一点)。显然,这些函数必须小心它们如何与它们接收和发出的对象交互。否则同一条记录可能会同时被两个函数修改。我建议仔细查看有关 object reusage 的部分在 Flink 的文档中。
关于java - 将某些 Flink 操作优先于其他操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50349382/