我的目标是从Redis和进程中获取流数据。如何通过Spark结构化流连接和处理数据?
最佳答案
要从Spark中的Redis Streams中读取数据,我们需要确定如何连接到Redis,以及Redis Streams中数据的架构结构。
要连接到Redis,我们必须创建一个带有Redis连接参数的新SparkSession:
import com.redislabs.provider.redis._
import redis.clients.jedis.Jedis
object Samj45 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("redis-example")
.master("local[*]")
.config("spark.redis.host", "localhost")
.config("spark.redis.port", "6379")
.getOrCreate()
val data_from_redis = spark
.readStream
.format("redis")
.option("stream.keys","data_clicks")
.schema(StructType(Array(
StructField("asset", StringType),
StructField("cost", LongType)
)))
.load()
对于编写,您可以使用ForeachWriter。让我知道是否有帮助。
关于redis - 我们如何将Spark结构化流连接到Redis?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61838264/