java - 如何在Spark Streaming中使用redis

标签 java apache-spark redis spark-redis

我正在构建一个应用程序,它从 redis 中的列表中读取 json 元素并使用 spark 对它们进行流式处理。 这是我写的:

public void readTheStream() throws UnknownHostException, IOException {
        SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Merge").set("redis.host", "localhost")
                .set("redis.port", "6379");;

        JavaSparkContext ctx = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(sparkConf));
        JavaStreamingContext context = new JavaStreamingContext(ctx, Durations.seconds(1));
}

如何使用 jssc 对象访问 redis。提前致谢。

最佳答案

这是一个从 myList 读取并将列表项打印到控制台的示例:

SparkConf sparkConf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
                .set("redis.host", "localhost")
                .set("redis.port", "6379");


JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(1000));

RedisConfig redisConfig = new RedisConfig(new RedisEndpoint(sparkConf));

RedisStreamingContext redisStreamingContext = new RedisStreamingContext(jssc.ssc());
String[] keys = new String[]{"myList"};
RedisInputDStream<Tuple2<String, String>> redisStream =
        redisStreamingContext.createRedisStream(keys, StorageLevel.MEMORY_ONLY(), redisConfig);

redisStream.print();

jssc.start();
jssc.awaitTermination();

关于java - 如何在Spark Streaming中使用redis,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52965273/

相关文章:

scala - 创建 Spark 存在用户定义的函数,其工作方式类似于 Scala Array#exists 函数

zend-framework - Zend Framework 无法读取没有 "zfcahce:"前缀的 key ?

java - Java卡中SDA和DDA的区别?

java - 虫洞攻击实现-传感器

apache-spark - Pyspark:拯救变压器

java - 为什么 Docker 容器中的 Spark 应用程序会失败并出现 OutOfMemoryError : Java heap space?

lua - 我可以分析在 Redis 中运行的 Lua 脚本吗?

javascript - Redis/Node.js - 2 个客户端(1 个发布/订阅)导致写入问题

java - POJO 中的数据库表

java - 如何将 Gmail 日期格式转换为 java.util.Date?