java - 弗林克 : is it losing records?

标签 java apache-kafka apache-flink

我的拓扑是这样的:kafka(p:6)->reduce(p:6)->db writer(p:12)(其中p:是并行)。

  • 我让它在单个节点“集群”上运行 taskmanager.numberOfTaskSlots: 30
  • 我知道我的 kafka 源每分钟产生约 650 万条记录
  • kafka 'reader' 的并行度等于 kafka 分区的数量

当我(通过 flink UI)观察这项工作约 1 分钟时,这些是我看到的值:

  • kafka -> 减少:发送了约 150 万条记录(减少了 > 4 倍)
  • 减少(窗口聚合 5 秒)-> 数据库写入 ~114K 条记录已发送(减少 > 2 倍)1
  • db write --> records received: ~23K (off by > 5x) 2

(其他部分的发送/接收值之间存在较小的差异,但我可以将这些归因于测量误差)

问题:
1. 剩下的记录在哪里?
2. 运行时这台机器上的负载永远不会超过 1.5。还有其他限制因素吗?
3. 我是否误读了 UI 中的值?

Java 8
Flink 1.0(最新github)
机器:32 核/96 Gb RAM

1这个可以用聚合过程来解释。
2这个值与写入数据库的内容一致。

最佳答案

Flink 不会丢失记录,它们只是在飞行中被缓冲,或者在 Kafka 中停留更长时间。从数字来看,您似乎正在经历背压

您可以看到“reducer”发出了很多尚未被“db writer”接收的记录。在那种情况下,这些记录仍在运算符(operator)之间的通信 channel 的缓冲区中。这些 channel 的缓冲量有限(取决于配置的缓冲区数量,通常为几 MB)。对于小记录,它们可能会保存多个 10k 记录。

如果一个运算符(operator)发送的记录数持续明显落后于接收运算符(operator)接收的记录数,这表明接收器(此处为“数据库写入器”)无法跟上数据速率。可能是因为数据库处理插入的速度不够快(太同步,太细粒度提交?),也许“数据库编写器”和数据库之间的网络已经饱和。

在这种情况下,“db writer”将对 reducer 进行反压,最终也会对 Kafka Source 进行反压。

如果您没有来自数据库的背压,要尝试数据速率是多少,您可以尝试一个“数据库编写器”简单地删除所有记录的实验。

关于java - 弗林克 : is it losing records?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33594972/

相关文章:

oracle - 无法使用 confluent CLI : java. sql.SQLException 设置 CLASSPATH:找不到适合 jdbc:oracle:thin 的驱动程序

python - 在 Kubernetes 中运行 Apache Beam python 管道

java - 带有服务帐户的 Google Calendar 1.17 API - 未解析的 GoogleClient 类

java - 实现微调器时遇到问题

java - 读取文本文件然后运行java程序的shell脚本

java - 尝试通过cmd运行java代码时出现JNI错误

apache-kafka - Apache Kafka 如何使用打开的文件描述符?

java - 使用 Kafka 让消费者保持活力

python - flink与python,作业执行失败

SingleOutputStreamOperator#returns(TypeHint<T> typeHint) 方法的 javadoc