playframework - 与 websocket 连接时使用 akka-stream-kafka 从 kafka Topic 获取最后一条消息

标签 playframework apache-kafka akka-stream akka-kafka

是否有可能使用 Akka Streams Kafka 获取关于 Kafka 主题的最后一条消息?我正在创建一个监听 Kafka 主题的 websocket,但目前它会在我连接时检索所有先前未显示的消息。这可以加起来相当多的消息,所以我只对最后一条消息 + 任何 future 的消息感兴趣。 (或仅 future 的消息)

来源:

def source(): Flow[Any, String, NotUsed] = {
  val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
  Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}

消费者设置:

  @Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
  val deserializer = new StringDeserializer()
  val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
    .getOrElse(Configuration.empty)

  ConsumerSettings(config.underlying, deserializer, deserializer)
    .withBootstrapServers(kafkaUrl)
    .withGroupId(GroupId)
}

我尝试添加设置 ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

应该是“自动将偏移量重置为最新的偏移量”,但是好像没有什么作用。

最佳答案

我能够使用 David van Geest 非常巧妙地描述的方法避免在客户端连接时获取任何上游数据 here

它归结为在消费者上拥有一个 BroadcastHub:

val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
.toMat(BroadcastHub.sink)(Keep.right)
.run()

并连接一个静态消费者来吃掉所有的上游数据

liveSource.to(Sink.ignore).run()

这让我有一个 WebSocket 客户端订阅消费者收到的所有数据:

def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}

或者基于 KafkaTopic(或者任何你想要的)进行过滤

def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
  Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
    x =>
      (Json.parse(x) \ "topic").asOpt[String] match {
        case Some(str) => str.equals(kafkaTopic)
        case None => false
      }
  }))
}

这并没有解决首次连接时向用户提供 x 量数据的问题,但我预见到我们会为任何历史数据添加一个简单的数据库查询,并让 WebSocket 连接只关注直播数据。

关于playframework - 与 websocket 连接时使用 akka-stream-kafka 从 kafka Topic 获取最后一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49794201/

相关文章:

java - Play 应用程序配置未加载?

spring-boot - KafkaContainer - 如何在 start() 之后在 Spring Boot 中读取 kafka 容器端口作为属性/如何在启动之前配置 Kafka 端口

scala - 将 SSE 与 Redis 发布/订阅和 Akka Streams 一起使用的最简单方法是什么?

java - 如何将复选框绑定(bind)到 Play 中的 boolean 值!框架

html - 使用 Play form helper 对某些字段进行并排布局

heroku - XForwardedSupport支持https Play !支持Heroku失败

apache-kafka - Spring boot应用程序生产者消费者的Kafka大消息配置支持

apache-kafka - Kafka Connect与接收器流

scala - Akka-Stream 实现比单线程实现慢

Scala Akka Streams 合并过滤器和映射