Ignite 和 Kafka 集成

标签 ignite

我正在尝试 Ignite 和 Kafka 集成,将 kafka 消息放入 Ignite 缓存中。

我的消息 key 是一个随机字符串(为了与Ignite一起使用,kafka消息 key 不能为空),并且该值是Person(一个java类)的json字符串表示形式

当Ignite收到这样的消息时,看起来Ignite将使用消息的 key (在我的例子中是随机字符串)作为缓存 key 。

是否可以将消息 key 更改为该人的 ID,以便我可以将其放入缓存中。

看起来streamer.receiver(new StreamReceiver)是可行的

        streamer.receiver(new StreamReceiver<String, String>() {
            public void receive(IgniteCache<String, String> cache, Collection<Map.Entry<String, String>> entries) throws IgniteException {
                for (Map.Entry<String, String> entry : entries) {
                    Person p = fromJson(entry.getValue());
                    //ignore the message key,and use person id as the cache key
                    cache.put(p.getId(), p);
                }
            }
        });

这是推荐的方式吗?我不确定在 StreamReceiver 中调用cache.put是否是正确的方法,因为它只是写入缓存之前的预处理步骤。

最佳答案

Data Streamer 会将您的所有 key 映射到缓存关联节点,创建批处理条目并将批处理发送到关联节点。之后,StreamReceiver 将接收您的条目,获取 Person 的 ID 并调用 cache.put(K, V)。将条目引导到将您的 key 映射到相应的缓存关联节点并向该节点发送更新请求。

一切看起来都不错。但是从 Kafka 映射随 secret 钥的结果和映射 Person ID 的结果将会不同(很可能是不同的节点)。因此,由于冗余的网络跃点,您的性能将会很差。

不幸的是,当前的KafkaStreamer实现不支持流元组提取器(例如,参见StreamSingleTupleExtractor类)。但您可以使用现有的 Kafka Streamer 实现作为示例轻松创建自己的 Kafka Streamer 实现。

您还可以尝试使用KafkaStreamerkeyDecodervalDecoder来提取Person的ID来自卡夫卡消息。我不确定,但它会有所帮助。

关于Ignite 和 Kafka 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41142181/

相关文章:

java - Ignite Marshaller 不使用二进制或优化的

postgresql - Apache Ignite 与 Postgresql

spring - 使用 Spring 事务管理点燃事务缓存

java - 如何检查连接是否是性能瓶颈

java - Apache Ignite SQL 查询结果差异

java - 部署到集群时,flatmap 函数出现异常

客户端模式下的 Ignite 和 RDMS 集成

java - Apache Ignite 2.6.0 - 6 节点集群分区模式下的高延迟

ignite - IgniteCache.lock可以用作分布式锁吗

java - Ignite CacheStore 实现在服务器端抛出 ClassNotFoundException