java - 使用java的play框架中的Kafka Consumer

标签 java apache-kafka playframework kafka-consumer-api

我已经在数千个网站上搜索了 Java 语言的 Play 框架中的 kafka 消费者示例。但找不到任何例子。任何人都可以提供有关如何编写连续使用 kafka 生成的主题的服务的详细信息。

谢谢

最佳答案

最近我遇到了类似的问题,所以我会在研究后添加一些注释。

我将描述如何在 Play Framework 中使用普通的 KafkaConsumer(但在 scala 中而不是 Java 中)。了解较低级别组件的工作原理总是值得的,但毕竟我强烈鼓励您考虑使用像 Alpakka 这样的库。 。例如,在我的上一个项目中,我决定使用 akka-projection它是建立在 Alpakka 之上的。它与 Akka 很好地结合在一起,并为您提供了一些额外的值(value),例如在发生故障时轻松配置重启策略。

要在 Play Framework 中使用普通 KafkaConsumer:

  1. 创建单例来管理KafkaConsumer。在以下示例中,while 循环在专用线程中运行。普通 KafkaConsumer 在应用程序关闭期间关闭。
@Singleton
class SampleKafkaConsumer @Inject()(coordinatedShutdown: CoordinatedShutdown) extends Logging {

  logger.info("SampleKafkaConsumer starts")

  private val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
  private val stopConsumer: AtomicBoolean = new AtomicBoolean(false)

  private val properties = new Properties()
  properties.put("bootstrap.servers", "localhost:6003")
  properties.put("group.id", s"sample-group-id")
  properties.put("key.deserializer", classOf[StringDeserializer])
  properties.put("value.deserializer", classOf[StringDeserializer])

  val kafkaConsumer = new KafkaConsumer[String, String](properties)
  kafkaConsumer.subscribe(Set("sample-topic").asJava)

  Future {
    while (!stopConsumer.get()) {
      kafkaConsumer.poll(Duration.ofSeconds(3)).asScala
        .foreach(r => {
          logger.info(s"SampleKafkaConsumer receives record: $r")
        })
    }
    logger.info(s"SampleKafkaConsumer quits 'while(true)' loop.")
  }(executionContext)
  .andThen(_ => kafkaConsumer.close())(executionContext)
  .andThen {
    case Success(_) =>
      logger.info(s"SampleKafkaConsumer succeed.")
    case Failure(e) =>
      logger.error(s"SampleKafkaConsumer fails.", e)
  }(executionContext)

  coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "SampleKafkaConsumer-stop"){() =>
    logger.info("Shutdown-task[SampleKafkaConsumer-stop] starts.")
    stopConsumer.set(true)
    Future{ Done }(executionContext).andThen{
      case Success(_) => logger.info("Shutdown-task[SampleKafkaConsumer-stop] succeed.")
      case Failure(e) => logger.error("Shutdown-task[SampleKafkaConsumer-stop] fails.", e)
    }(executionContext)
  }
}
  • 使用 Play 模块将其标记为“eager”:
  • class KafkaModule extends AbstractModule with Logging {
    
      override def configure(): Unit = {
        logger.info("Starting KafkaModule")
        bind(classOf[SampleKafkaConsumer]).asEagerSingleton()
      }
    }
    

    3.在 Play 配置文件中启用您的模块:

    play.modules.enabled += "kafka.KafkaModule
    

    仅此而已。我详细描述了一切here .

    关于java - 使用java的play框架中的Kafka Consumer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60045171/

    相关文章:

    templates - 将列表映射到 HTML 并将结果插入 Play 2.0 模板中

    inputText中的java代码

    java - 两个 docker 容器之间通信出现问题

    java - 提供的正则表达式匹配什么?

    java - Maven:发布插件不提交更新版本

    jdbc - Debezium/JDBC 和 Kafka 主题保留

    Spring Boot Kafka : Commit cannot be completed since the group has already rebalanced

    java - 用于单元测试的 Play Framework 2 Java 模拟插件

    java - 添加自定义 UDF 后 ksql-server 未启动

    scala - 升级 SBT 时出错