java - Apache 弗林克 : Wierd FlatMap behaviour

标签 java apache-flink

我正在将数据流摄取到 Flink 中。对于该数据的每个“实例”,我都有一个时间戳。我可以检测我从中获取数据的机器是否正在“生产”或“不生产”,这是通过位于其自己的静态类中的自定义平面 map 函数来完成的。

我想计算机器已经生产/不生产多久了。 我当前的方法是在两个简单列表中收集生产和非生产时间戳。对于数据的每个“实例”,我通过从最早的时间戳减去最新的时间戳来计算当前的生产/非生产持续时间。但这给了我不正确的结果。当生产状态从生产变为非生产时,我清除生产的时间戳列表,反之亦然,这样,如果生产再次开始,持续时间从零开始。

我查看了收集各自时间戳的两个列表,发现了一些我不明白的事情。我的假设是,只要机器“生产”,生产时间戳列表中的第一个时间戳就保持不变,而每个新数据实例都会将新时间戳添加到列表中。 显然,这个假设是错误的,因为我在列表中得到了看似随机的时间戳。不过,它们的顺序仍然正确。

这是我的平面 map 函数的代码:

public static class ImaginePaperDataConverterRich extends RichFlatMapFunction<ImaginePaperData, String> {
    private static final long serialVersionUID = 4736981447434827392L;
    private transient ValueState<ProductionState> stateOfProduction;
    SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss.SS");
    DateFormat timeDiffFormat = new SimpleDateFormat("dd HH:mm:ss.SS");
    String timeDiffString = "00 00:00:00.000";
    List<String> productionTimestamps = new ArrayList<>();
    List<String> nonProductionTimestamps = new ArrayList<>();

    public String calcProductionTime(List<String> timestamps) {
        if (!timestamps.isEmpty()) {
            try {
                Date firstDate = dateFormat.parse(timestamps.get(0));
                Date lastDate = dateFormat.parse(timestamps.get(timestamps.size()-1));
                long timeDiff = lastDate.getTime() - firstDate.getTime();

                if (timeDiff < 0) {
                    System.out.println("Something weird happened. Maybe EOF.");
                    return timeDiffString;
                }

                timeDiffString = String.format("%02d %02d:%02d:%02d.%02d",
                    TimeUnit.MILLISECONDS.toDays(timeDiff),
                    TimeUnit.MILLISECONDS.toHours(timeDiff)   % TimeUnit.HOURS.toHours(1),
                    TimeUnit.MILLISECONDS.toMinutes(timeDiff) % TimeUnit.HOURS.toMinutes(1),
                    TimeUnit.MILLISECONDS.toSeconds(timeDiff) % TimeUnit.MINUTES.toSeconds(1),
                    TimeUnit.MILLISECONDS.toMillis(timeDiff)  % TimeUnit.SECONDS.toMillis(1));

            } catch (ParseException e) {
                e.printStackTrace();
            }
            System.out.println("State duration: " + timeDiffString);
        }
        return timeDiffString;
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<ProductionState> descriptor = new ValueStateDescriptor<>(
            "stateOfProduction",
            TypeInformation.of(new TypeHint<ProductionState>() {}),
            ProductionState.NOT_PRODUCING);
            stateOfProduction = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(ImaginePaperData ImaginePaperData, Collector<String> output) throws Exception {
        List<String> warnings = new ArrayList<>();
        JSONObject jObject = new JSONObject();
        String productionTime = "0";
        String nonProductionTime = "0";

        // Data analysis
        if (stateOfProduction == null || stateOfProduction.value() == ProductionState.NOT_PRODUCING && ImaginePaperData.actSpeedCl > 60.0) {
            stateOfProduction.update(ProductionState.PRODUCING);
        } else if (stateOfProduction.value() == ProductionState.PRODUCING && ImaginePaperData.actSpeedCl < 60.0) {
            stateOfProduction.update(ProductionState.NOT_PRODUCING);
        }

        if(stateOfProduction.value() == ProductionState.PRODUCING) {
            if (!nonProductionTimestamps.isEmpty()) {
                System.out.println("Production has started again, non production timestamps cleared");
                nonProductionTimestamps.clear();
            }
            productionTimestamps.add(ImaginePaperData.timestamp);

            System.out.println(productionTimestamps);
            productionTime = calcProductionTime(productionTimestamps);
        } else {
            if(!productionTimestamps.isEmpty()) {
                System.out.println("Production has stopped, production timestamps cleared");
                productionTimestamps.clear();
            }
            nonProductionTimestamps.add(ImaginePaperData.timestamp);
            warnings.add("Production has stopped.");

            System.out.println(nonProductionTimestamps);
            //System.out.println("Production stopped");
            nonProductionTime = calcProductionTime(nonProductionTimestamps);
        }
// The rest is just JSON stuff

我是否必须将这两个时间戳列表保存在 ListState 中?

编辑:因为另一位用户询问,这是我得到的数据。

{'szenario': 'machine01', 'timestamp': '31.10.2018 09:18:39.432069', 'data': {1: 100.0, 2: 100.0, 101: 94.0, 102: 120.0, 103: 65.0}}

我期望的行为是我的 flink 程序收集 ProductionTimestamps 和 nonProductionTimestamps 两个列表中的时间戳。然后我希望我的 calcProductionTime 方法从第一个时间戳中减去列表中的最后一个时间戳,以获得我第一次检测到机器正在“生产”/“不生产”到它停止“生产”/“不生产”的时间之间的持续时间。

最佳答案

我发现“看似随机”的时间戳的原因是 Apache Flink 的并行执行。当并行度设置为 > 1 时,事件的顺序不再得到保证。

我的快速解决方法是将程序的并行度设置为 1,据我所知,这保证了事件的顺序。

关于java - Apache 弗林克 : Wierd FlatMap behaviour,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53067470/

相关文章:

java - 当两个文本字段为空时禁用按钮

java - 将字符串转换为 byte[] 返回错误值(编码?)

java - Java API 中单例类的示例

scala - 任务不可序列化 Flink

batch-processing - Flink 批处理接收器

java - Apache Beam Counter/Metrics 在 Flink WebUI 中不可用

java - EKS 上的 Flink 集群

java - 分配处理时通常有多少开销?

java - 我怎样才能让这个程序将用户输入添加到我的链接列表中?

java - 重新加载保留 session 的 JVM 进程