java - 如何在 Flink 中增加 SinkFunction 的 numRecordsOutPut 指标?

标签 java redis apache-flink

我正在使用 flink 消费 kafka 并写入 redis。

这是我的 redis 接收函数:

            .addSink(new RichSinkFunction<MobilePageEvent>() {

                @Override
                public void invoke(MobilePageEvent event, Context context) {

                    JEDIS_CLUSTER.zadd(..);
                }
            })
            .name("redis sink");

虽然我可以从 redis 命令行获取数据,但指标显示 sink 函数的输出为零:

enter image description here

我怎样才能增加这个指标?

最佳答案

numRecordsIn 和 numRecordsOut 指标仅计算在 Flink 作业本身内流动的流记录,不包括与外部系统的通信。所以换句话说,sources 不报告任何传入的记录,而 sinks 不报告任何传出的记录。

在我看来,您有几个选择:

  1. 使用接收器上的 numRecordsIn 指标作为你想知道的近似值
  2. 派生或扩展 RedisSink 并添加您想要的指标

显示了添加计数器指标的模式 here .

对于 redis 接收器,您可以在 open() 方法中初始化一个 Counter,然后在 invoke() 中递增它。但这似乎毫无意义,因为这只会反射(reflect) numRecordsIn 指标。如果您的 Redis 接收器正在执行缓冲的批量写入,那么等待递增指标直到数据实际发送到 Redis 可能更有意义——在这种情况下,您可能宁愿使用 Meter 而不是 Counter。

关于java - 如何在 Flink 中增加 SinkFunction 的 numRecordsOutPut 指标?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54928734/

相关文章:

java - JavaFX 中的 registerChangeListener 方法有什么作用?

java - Android 版本 - 4.0.4 中的通话记录与短信日志位于同一位置

docker - 在 Docker 文件中运行命令

redis - 如何查看我的 Redis 数据库 current_size?

java - 如何等待Redis缓存缓存信息

scala - 提交失败异常 : Commit cannot be completed since the group has already rebalanced

java - 如何将不同类型的对象添加到 Android 中的 HashMap 中?

java - 使用 Java OutputFormat 发出 Scala 元组

apache-flink - Flink 自动缩放和最大并行度

java - 如何从后台线程调用 Java 类