我正在使用 spark 1.5.0 和 java 7。
输入来自 kafka,格式为不同的 json 对象,带有 type
field 。例如:
{'type': 'alpha', ...}
{'type': 'beta', ...}
...
我正在创建一个 JavaPairDStream<String, Integer>
来自与每种事件类型的计数相对应的输入数据。
我想把这些数据存储到redis。我该怎么做呢?
最佳答案
使用foreachRDD
和forEach
函数实现如下:
wordCounts.foreachRDD(
new Function<JavaPairRDD<String, Integer>, Void>() {
public Void call(JavaPairRDD<String, Integer> rdd) {
rdd.foreach(
new VoidFunction<Tuple2<String,Integer>>() {
public void call(Tuple2<String,Integer> wordCount) {
System.out.println(wordCount._1() + ":" + wordCount._2());
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");
Jedis jedis = pool.getResource();
jedis.select(0);
jedis.set(wordCount._1(), wordCount._2().toString());
}
}
);
return null;
}
}
);
关于java - 如何将JavaPairDStream写入Redis?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33806596/