java - 如何在Spark Streaming中使用redis?

标签 java spark-streaming

我在java中使用spark流。我将sparkconfig obj配置为SparkConf sparkConf = new SparkConf().setAppName("MyApp").setMaster("local[2]") .set("spark.streaming.stopGraceivelyOnShutdown","true") .set("redis.host", "localhost") .set("redis.port", "6379"); 并在 JavastreamingContext 中传递配置 obj。

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(1000));

如何使用 jssc 对象访问 Redis。 提前致谢。

最佳答案

这将从 Redis 列表创建一个流

  SparkConf sparkConf = new SparkConf().setAppName("MyApp").setMaster("local[2]")
            .set("spark.streaming.stopGracefullyOnShutdown", "true")
            .set("redis.host", "localhost")
            .set("redis.port", "6379");


    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(1000));

    RedisConfig redisConfig = new RedisConfig(new RedisEndpoint(sparkConf));

    RedisStreamingContext redisStreamingContext = new RedisStreamingContext(jssc.ssc());
    String[] keys = new String[]{"myList"};
    RedisInputDStream<Tuple2<String, String>> redisStream =
            redisStreamingContext.createRedisStream(keys, StorageLevel.MEMORY_ONLY(), redisConfig);

    redisStream.print();

    jssc.start();
    jssc.awaitTermination();

推送一些数据到列表:

LPUSH "myList" "aaaa"
LPUSH "myList" "bbbb"

关于java - 如何在Spark Streaming中使用redis?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52421577/

相关文章:

Java swing、SwingWorker、进程栏不会更新

java - 500 错误 :HTML created in coded not getting downloaded in servlet

java - 如何在 Spark Streaming with Kafka 中禁用enable.auto.commit 的情况下处理偏移提交失败?

apache-spark - 使用 Spark Streaming 读取 Kafka 记录时出现不可序列化异常

elasticsearch - saveToEs( Elasticsearch Spark )添加作业但未开始

java - Hibernate 搜索前缀

java - 有没有办法可以在Windows开发盒上开发和测试MapReduce程序

java - 使用Jlink时如何安装jwdp依赖

apache-spark - Spark Kafka Direct DStream-如果设置了num-executors,则在yarn-cluster模式下有多少个执行器和RDD分区?

apache-spark - 多个 SpartContext 在同一个 JVM 上运行