我已经在数千个网站上搜索了 Java 语言的 Play 框架中的 kafka 消费者示例。但找不到任何例子。任何人都可以提供有关如何编写连续使用 kafka 生成的主题的服务的详细信息。
谢谢
最佳答案
最近我遇到了类似的问题,所以我会在研究后添加一些注释。
我将描述如何在 Play Framework 中使用普通的 KafkaConsumer(但在 scala 中而不是 Java 中)。了解较低级别组件的工作原理总是值得的,但毕竟我强烈鼓励您考虑使用像 Alpakka 这样的库。 。例如,在我的上一个项目中,我决定使用 akka-projection它是建立在 Alpakka 之上的。它与 Akka 很好地结合在一起,并为您提供了一些额外的值(value),例如在发生故障时轻松配置重启策略。
要在 Play Framework 中使用普通 KafkaConsumer:
- 创建单例来管理
KafkaConsumer
。在以下示例中,while
循环在专用线程中运行。普通KafkaConsumer
在应用程序关闭期间关闭。
@Singleton
class SampleKafkaConsumer @Inject()(coordinatedShutdown: CoordinatedShutdown) extends Logging {
logger.info("SampleKafkaConsumer starts")
private val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
private val stopConsumer: AtomicBoolean = new AtomicBoolean(false)
private val properties = new Properties()
properties.put("bootstrap.servers", "localhost:6003")
properties.put("group.id", s"sample-group-id")
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](properties)
kafkaConsumer.subscribe(Set("sample-topic").asJava)
Future {
while (!stopConsumer.get()) {
kafkaConsumer.poll(Duration.ofSeconds(3)).asScala
.foreach(r => {
logger.info(s"SampleKafkaConsumer receives record: $r")
})
}
logger.info(s"SampleKafkaConsumer quits 'while(true)' loop.")
}(executionContext)
.andThen(_ => kafkaConsumer.close())(executionContext)
.andThen {
case Success(_) =>
logger.info(s"SampleKafkaConsumer succeed.")
case Failure(e) =>
logger.error(s"SampleKafkaConsumer fails.", e)
}(executionContext)
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "SampleKafkaConsumer-stop"){() =>
logger.info("Shutdown-task[SampleKafkaConsumer-stop] starts.")
stopConsumer.set(true)
Future{ Done }(executionContext).andThen{
case Success(_) => logger.info("Shutdown-task[SampleKafkaConsumer-stop] succeed.")
case Failure(e) => logger.error("Shutdown-task[SampleKafkaConsumer-stop] fails.", e)
}(executionContext)
}
}
- 使用 Play 模块将其标记为“eager”:
class KafkaModule extends AbstractModule with Logging {
override def configure(): Unit = {
logger.info("Starting KafkaModule")
bind(classOf[SampleKafkaConsumer]).asEagerSingleton()
}
}
3.在 Play 配置文件中启用您的模块:
play.modules.enabled += "kafka.KafkaModule
仅此而已。我详细描述了一切here .
关于java - 使用java的play框架中的Kafka Consumer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60045171/