java - Apache Flink 的吞吐量和延迟

标签 java apache-flink latency flink-streaming throughput

我为 Apache Flink 编写了一个非常简单的 java 程序,现在我对测量统计数据感兴趣,例如吞吐量(每秒处理的元组数量)和延迟(程序处理每个输入元组所需的时间)。

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
        .map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv");

JobExecutionResult res = env.execute();

我知道 Flink 公开了一些指标:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html

但我不知道如何使用它们来获得我想要的东西。从链接中我读到“仪表”可用于测量平均吞吐量,但是在定义它之后,我应该如何使用它?

最佳答案

我们正在 yarn 上运行的生产流作业中运行自定义指标,例如仪表、仪表。

步骤如下:

对 pom.xml 的额外依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-metrics-dropwizard</artifactId>
    <version>${flink.version}</version>
</dependency>

我们使用的是1.2.1版本

然后将meter添加到MyMapper类中。

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Meter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Test {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
                .readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
                .map(new MyMapper())
                .writeAsCsv("/home/LizardKing/Results.csv");

        JobExecutionResult res = env.execute();
    }


    private static class MyMapper extends RichMapFunction<String, Object> {

        private transient Meter meter;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.meter = getRuntimeContext()
                    .getMetricGroup()
                    .meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
        }

        @Override
        public Object map(String value) throws Exception {    
            this.meter.markEvent();
            return value;
        }
    }
}

希望这有帮助。

关于java - Apache Flink 的吞吐量和延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44587645/

相关文章:

java - 运行 java 时,Gradle 任务运行已完成,退出值非零 100

kubernetes - Flink HA JobManager 集群无法选举领导者

python - 在 Kubernetes 中运行 Apache Beam python 管道

performance - TCP 接收窗口

java - 通过类实例化模拟 jdbcTemplate

java - 二维数组的快速散列

java - 虚拟键盘上方的 PopupWindow

java - Flink 和 RocksDB - 列表状态大于主内存?

iOS Safari WebSockets : huge latency when sending messages at short intervals

c++ - 为什么我在时间测量中会出现如此巨大的抖动?