我正在尝试 Ignite 和 Kafka 集成,将 kafka 消息放入 Ignite 缓存中。
我的消息 key 是一个随机字符串(为了与Ignite一起使用,kafka消息 key 不能为空),并且该值是Person(一个java类)的json字符串表示形式
当Ignite收到这样的消息时,看起来Ignite将使用消息的 key (在我的例子中是随机字符串)作为缓存 key 。
是否可以将消息 key 更改为该人的 ID,以便我可以将其放入缓存中。
看起来streamer.receiver(new StreamReceiver)
是可行的
streamer.receiver(new StreamReceiver<String, String>() {
public void receive(IgniteCache<String, String> cache, Collection<Map.Entry<String, String>> entries) throws IgniteException {
for (Map.Entry<String, String> entry : entries) {
Person p = fromJson(entry.getValue());
//ignore the message key,and use person id as the cache key
cache.put(p.getId(), p);
}
}
});
这是推荐的方式吗?我不确定在 StreamReceiver 中调用cache.put是否是正确的方法,因为它只是写入缓存之前的预处理步骤。
最佳答案
Data Streamer 会将您的所有 key 映射到缓存关联节点,创建批处理条目并将批处理发送到关联节点。之后,StreamReceiver
将接收您的条目,获取 Person
的 ID 并调用 cache.put(K, V)
。将条目引导到将您的 key 映射到相应的缓存关联节点并向该节点发送更新请求。
一切看起来都不错。但是从 Kafka 映射随 secret 钥的结果和映射 Person ID 的结果将会不同(很可能是不同的节点)。因此,由于冗余的网络跃点,您的性能将会很差。
不幸的是,当前的KafkaStreamer
实现不支持流元组提取器(例如,参见StreamSingleTupleExtractor
类)。但您可以使用现有的 Kafka Streamer 实现作为示例轻松创建自己的 Kafka Streamer 实现。
您还可以尝试使用KafkaStreamer
的keyDecoder
和valDecoder
来提取Person
的ID来自卡夫卡消息。我不确定,但它会有所帮助。
关于Ignite 和 Kafka 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41142181/