java - 如何将JavaPairDStream写入Redis?

标签 java apache-spark redis spark-streaming

我正在使用 spark 1.5.0 和 java 7。

输入来自 kafka,格式为不同的 json 对象,带有 type field 。例如:

{'type': 'alpha', ...}
{'type': 'beta', ...}
...

我正在创建一个 JavaPairDStream<String, Integer>来自与每种事件类型的计数相对应的输入数据。

我想把这些数据存储到redis。我该怎么做呢?

最佳答案

使用foreachRDDforEach 函数实现如下:

wordCounts.foreachRDD(
    new Function<JavaPairRDD<String, Integer>, Void>() {
        public Void call(JavaPairRDD<String, Integer> rdd) {
            rdd.foreach(
                new VoidFunction<Tuple2<String,Integer>>() {
                    public void call(Tuple2<String,Integer> wordCount) {
                        System.out.println(wordCount._1() + ":" + wordCount._2());
                        JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");
                        Jedis jedis = pool.getResource();
                        jedis.select(0);
                        jedis.set(wordCount._1(), wordCount._2().toString());
                    }
                }
            );
            return null;
        }
    }
);

关于java - 如何将JavaPairDStream写入Redis?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33806596/

相关文章:

java - 继承-为什么我不能在子类中声明与 super 中具有相同方法签名的私有(private)方法?

java - Junit的assert.fail()方法正在停止执行,需要继续执行并最终给出结果

java - 使用终端编译java项目

斯卡拉 Spark : how to use dataset for a case class with the schema has snake_case?

java - Hibernate 使用 apache commons 日志记录和 slf4j - 为什么需要 2 个日志记录抽象层?

csv - 如何在 PySpark 中使用 read.csv 跳过多行

hadoop - mesos 上的 spark 是否支持数据局部性?

ruby - Sidekiq 作业在 Heroku 上排在队列中

database - Neo4j + Redis?是好是坏?

c# - 使用 StackExchange.Redis 超时异常的原因可能是什么?