我在java中使用spark流。我将sparkconfig obj配置为SparkConf sparkConf = new SparkConf().setAppName("MyApp").setMaster("local[2]")
.set("spark.streaming.stopGraceivelyOnShutdown","true")
.set("redis.host", "localhost")
.set("redis.port", "6379");
并在 JavastreamingContext 中传递配置 obj。
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(1000));
如何使用 jssc 对象访问 Redis。 提前致谢。
最佳答案
这将从 Redis 列表创建一个流
SparkConf sparkConf = new SparkConf().setAppName("MyApp").setMaster("local[2]")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("redis.host", "localhost")
.set("redis.port", "6379");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(1000));
RedisConfig redisConfig = new RedisConfig(new RedisEndpoint(sparkConf));
RedisStreamingContext redisStreamingContext = new RedisStreamingContext(jssc.ssc());
String[] keys = new String[]{"myList"};
RedisInputDStream<Tuple2<String, String>> redisStream =
redisStreamingContext.createRedisStream(keys, StorageLevel.MEMORY_ONLY(), redisConfig);
redisStream.print();
jssc.start();
jssc.awaitTermination();
推送一些数据到列表:
LPUSH "myList" "aaaa"
LPUSH "myList" "bbbb"
关于java - 如何在Spark Streaming中使用redis?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52421577/