我一直在尝试寻找一个连接器来从 Redis 读取数据到 Flink。 Flink 的文档包含对写入 Redis 的连接器的描述。我需要在我的 Flink 作业中从 Redis 读取数据。在 Using Apache Flink for data streaming ,Fabian已经提到可以从Redis中读取数据。可用于此目的的连接器是什么?
最佳答案
我们正在生产中运行一个大致像这样的
class RedisSource extends RichSourceFunction[SomeDataType] {
var client: RedisClient = _
override def open(parameters: Configuration) = {
client = RedisClient() // init connection etc
}
@volatile var isRunning = true
override def cancel(): Unit = {
isRunning = false
client.close()
}
override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
for {
data <- ??? // get some data from the redis client
} yield ctx.collect(SomeDataType(data))
}
}
我认为这实际上取决于您需要从 Redis 中获取什么。以上可用于从列表/队列中获取消息,转换/推送,然后将其从队列中删除。 Redis 还支持 Pub/Sub,因此可以订阅、获取 SourceConext 并将消息推送到下游。
关于redis - 从 Redis 读取数据到 Flink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44193182/