实现使用引用数据丰富存储在 Kafka 中的传入事件流的用例的简单方法是调用 map()
运算符(operator)为每个传入事件提供此引用数据的外部服务 REST API。
eventStream.map((key, event) -> /* query the external service here, then return the enriched event */)
另一种方法是使用带有引用数据的第二个事件流并将其存储在
KTable
中。这将是一个轻量级的嵌入式“数据库”,然后将其加入主要事件流。KStream<String, Object> eventStream = builder.stream(..., "event-topic");
KTable<String, Object> referenceDataTable = builder.table(..., "reference-data-topic");
KTable<String, Object> enrichedEventStream = eventStream
.leftJoin(referenceDataTable , (event, referenceData) -> /* return the enriched event */)
.map((key, enrichedEvent) -> new KeyValue<>(/* new key */, enrichedEvent)
.to("enriched-event-topic", ...);
“天真”的方法可以被认为是一种反模式吗?可以推荐“
KTable
”方法作为首选方法吗?Kafka 每分钟可以轻松管理数百万条消息。从
map()
调用的服务运算符(operator)也应该能够处理高负载并且具有高可用性。这些是服务实现的额外要求。但是如果服务满足这些标准,可以使用“天真”的方法吗?
最佳答案
是的,在map()
之类的Kafka Streams操作里面做RPC是可以的手术。您只需要了解这样做的利弊,请参见下文。此外,您应该从您的操作中同步执行任何此类 RPC 调用(我不会在这里详细说明原因;如果需要,我建议创建一个新问题)。
从 Kafka Streams 操作中执行 RPC 调用的优点:
缺点:
map()
内的)是一种副作用,因此是 Kafka Streams 的黑匣子。 Kafka Streams 的处理保证不会扩展到此类副作用。map()
内部执行 RPC 调用。将是幂等的。确保后者是您的责任。 替代品
如果您想知道还有哪些其他选择:例如,如果您正在执行 RPC 调用来查找数据(例如,使用侧/上下文信息来丰富传入的事件流),您可以通过使直接在 Kafka 中查找可用的数据。如果查找数据在 MySQL 中,您可以设置一个 Kafka 连接器来持续将 MySQL 数据摄取到 Kafka 主题中(想想:CDC)。在 Kafka Streams 中,您可以将查找数据读入
KTable
并通过流表连接执行输入流的丰富。
关于apache-kafka - Kafka 流和 RPC : is calling REST service in map() operator considered an anti-pattern?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49757709/