redis - 从 Redis 读取数据到 Flink

标签 redis apache-flink flink-streaming

我一直在尝试寻找一个连接器来从 Redis 读取数据到 Flink。 Flink 的文档包含对写入 Redis 的连接器的描述。我需要在我的 Flink 作业中从 Redis 读取数据。在 Using Apache Flink for data streaming ,Fabian已经提到可以从Redis中读取数据。可用于此目的的连接器是什么?

最佳答案

我们正在生产中运行一个大致像这样的

class RedisSource extends RichSourceFunction[SomeDataType] {

  var client: RedisClient = _

  override def open(parameters: Configuration) = {
    client = RedisClient() // init connection etc
  }

  @volatile var isRunning = true

  override def cancel(): Unit = {
    isRunning = false
    client.close()
  }

  override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
      for {
        data <- ??? // get some data from the redis client
      } yield ctx.collect(SomeDataType(data))

  }
} 

我认为这实际上取决于您需要从 Redis 中获取什么。以上可用于从列表/队列中获取消息,转换/推送,然后将其从队列中删除。 Redis 还支持 Pub/Sub,因此可以订阅、获取 SourceConext 并将消息推送到下游。

关于redis - 从 Redis 读取数据到 Flink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44193182/

相关文章:

redis - 无法更改端口号

macos - 如何在 mac os 10.13 上安装 php-redis 扩展?

apache-spark - 基于流的应用程序中的受控/手动错误/恢复处理

java - Flink奇怪的 "Cannot Serialize operator object class ...CoBroadcastWithNonKeyedOperator"错误

hadoop - 无法将点/检查点flink状态保存到AWS S3存储桶

c# - Redis:当删除项目存在于多个列表并设置时

redis - 从 Windows 管理 AWS redis

apache-flink - 连接的 flink 流中的背压

java - Apache Flink 测试中是否有像 Reactor 和 RxJava 中那样的虚拟时间概念

java - 如何在Flink Kafka Consumer中动态获取处理kafka主题名称?