我对 Ignite 的流媒体部分有疑问。
我的理解是,这是将数据导入缓存的方法,但我也看到我们可以配置流接收器来应用一些其他自定义逻辑。
因此,我尝试创建一个带有接收器的类和一个将数据注入(inject)流中的类(因此服务器模式下有 2 个主实例和 2 个 Ignite 实例),但我“只是”将数据放入流媒体的缓存中(没有处理到接收器中的任何自定义逻辑)。所以,我想问一下我是否错过了一些东西,或者我是否不太了解什么是 Streams into Ignite。
如果我将发送者部分放入接收者中,我就会得到打印结果。
有谁知道我做(或理解)错了什么?
接收器类:
public class Receiver {
public static void main(String[] args){
IgniteConfiguration igniteConfig = new IgniteConfiguration();
CacheConfiguration<String, String> cacheConfig = new CacheConfiguration<>("CacheStream");
igniteConfig.setCacheConfiguration(cacheConfig);
Ignite ignite = Ignition.getOrStart(igniteConfig);
IgniteDataStreamer<String, String> streamer = ignite.dataStreamer("CacheStream");
streamer.receiver(StreamVisitor.from((cacheLambda, e) -> {
System.out.println("Value : " + e.getValue());
}));
}
}
发件人类别:
public class Sender {
public static void main(String[] args){
IgniteConfiguration igniteConfig = new IgniteConfiguration();
CacheConfiguration<String, String> cacheConfig = new CacheConfiguration<>("CacheStream");
igniteConfig.setCacheConfiguration(cacheConfig);
Ignite ignite = Ignition.getOrStart(igniteConfig);
IgniteDataStreamer<String, String> streamer = ignite.dataStreamer("CacheStream");
for(int i = 0 ; i < 10 ; i++){
streamer.addData("key-"+i, "value-"+i);
}
streamer.flush();
}
}
最佳答案
ignite.dataStreamer("CacheStream") 不会返回您之前创建的相同数据流,它每次都会创建新的数据流。
因此,在您的情况下,您配置了 2 个不同的数据流传输器,并且使用未配置接收器的流传输器上传数据。
关于java - Apache 点燃数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45034975/