java - Apache 点燃数据流

标签 java stream ignite

我对 Ignite 的流媒体部分有疑问。

我的理解是,这是将数据导入缓存的方法,但我也看到我们可以配置流接收器来应用一些其他自定义逻辑。

因此,我尝试创建一个带有接收器的类和一个将数据注入(inject)流中的类(因此服务器模式下有 2 个主实例和 2 个 Ignite 实例),但我“只是”将数据放入流媒体的缓存中(没有处理到接收器中的任何自定义逻辑)。所以,我想问一下我是否错过了一些东西,或者我是否不太了解什么是 Streams into Ignite。

如果我将发送者部分放入接收者中,我就会得到打印结果。

有谁知道我做(或理解)错了什么?

接收器类:

public class Receiver {
    public static void main(String[] args){
        IgniteConfiguration igniteConfig = new IgniteConfiguration();
        CacheConfiguration<String, String> cacheConfig = new CacheConfiguration<>("CacheStream");   


        igniteConfig.setCacheConfiguration(cacheConfig);

        
        Ignite ignite = Ignition.getOrStart(igniteConfig);
        
        IgniteDataStreamer<String, String> streamer = ignite.dataStreamer("CacheStream");
        
        streamer.receiver(StreamVisitor.from((cacheLambda, e) -> {
            System.out.println("Value : " + e.getValue());
        }));
    }
}

发件人类别:

public class Sender {
    public static void main(String[] args){
        IgniteConfiguration igniteConfig = new IgniteConfiguration();
        CacheConfiguration<String, String> cacheConfig = new CacheConfiguration<>("CacheStream");

        igniteConfig.setCacheConfiguration(cacheConfig);

        Ignite ignite = Ignition.getOrStart(igniteConfig);
        
        IgniteDataStreamer<String, String> streamer = ignite.dataStreamer("CacheStream");
        
        for(int i = 0 ; i < 10 ; i++){
            streamer.addData("key-"+i, "value-"+i);
        }
        streamer.flush();
    }
}

最佳答案

ignite.dataStreamer("CacheStream") 不会返回您之前创建的相同数据流,它每次都会创建新的数据流。

因此,在您的情况下,您配置了 2 个不同的数据流传输器,并且使用未配置接收器的流传输器上传数据。

关于java - Apache 点燃数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45034975/

相关文章:

file - 编程中流的底层原理是什么?

mysql - 当持久存储中的数据可用且 csv 可用时,如何更新 ignite 缓存?

node.js - Busboy 的奇怪行为

c# - 系统重启后Ignite缓存变空

windows - ignite 节点未在 Windows 7 上启动

java - Swing:如何创建一个不会移动到西部组件上的东部组件

java - 有没有办法让 NULL 值作为 MySql 中的可选条件?

java - 如何使用 PDFBox 设置正确的创建日期?

java - 如何刷新 JTable 中的数据并显示更新的列表

iphone - ios流mp3并录制声音