斯卡拉 : Never ending stream

标签 scala slick akka-stream reactive-streams

使用 Slick,您可以执行以下操作以从表中生成结果流:

val q = for (e <- events) yield e.name
val p: DatabasePublisher[String] = db.stream(q.result)

p.foreach { s => println(s"Event: $s") }

这将打印 events 中的所有事件表并在最后一行之后终止。

假设可以以某种方式通知您何时将新行输入到 events表,是否可以编写一个在插入事件时连续输出事件的流?一种 tail -f对于数据库表。

我认为 Slick 本身不会支持这一点,但我认为应该可以使用 Akka 流来提供帮助。因此,如果您可以从 Slick Source 获取某些内容直到它为空,然后等待事件指示表中的更多数据,然后流式传输新数据。可能通过使用 ActorPublisher绑定(bind)这个逻辑?

只是想知道是否有人在这方面有任何经验或任何建议?

最佳答案

ActorPublisher 自 Akka 2.5.0 以来已被弃用。这是使用 postgresql-async 的替代方法库并创建一个 SourceQueue 在 Actor 内部:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser

import scala.concurrent.duration._
import scala.concurrent.Await

class DbActor(implicit materializer: ActorMaterializer) extends Actor with ActorLogging {
  private implicit val ec = context.system.dispatcher

  val queue =  
    Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
      .to(Sink.foreach(println))
      .run()

  val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
  val connection = new PostgreSQLConnection(configuration)
  Await.result(connection.connect, 5 seconds)

  connection.sendQuery("LISTEN my_channel")
  connection.registerNotifyListener { message =>
    val msg = message.payload
    log.debug("Sending the payload: {}", msg)
    self ! msg
  }

  def receive = {
    case payload: String =>
      queue.offer(payload).pipeTo(self)
    case QueueOfferResult.Dropped =>
      log.warning("Dropped a message.")
    case QueueOfferResult.Enqueued =>
      log.debug("Enqueued a message.")
    case QueueOfferResult.Failure(t) =>
      log.error("Stream failed: {}", t.getMessage)
    case QueueOfferResult.QueueClosed =>
      log.debug("Stream closed.")
  }
}

上面的代码只是在 PostgreSQL 发生通知时打印它们;您可以替换 Sink.foreach(println)与另一个 Sink .要运行它:
import akka.actor._
import akka.stream.ActorMaterializer

object Example extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  system.actorOf(Props(classOf[DbActor], materializer))
}

关于斯卡拉 : Never ending stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34273722/

相关文章:

scala - Akka Streams 按类型拆分流

mysql - Play +光滑 : How to do partial model updates?

scala - 如何在光滑表映射中省略案例类字段?

scala - 使用流程中的最新项目完成请求

apache-flink - Akka 流与 Apache Flink

scala - 尝试从 Scala 中的其他类型生成类型

scala - 没有适用于案例类别类型的 TypeTag

java - 如何解决play framework 2.2.1 connect timeout异常问题

scala - 为什么我不能用 Map 迭代 (Int, Int, Int)?

scala - 如何在 Option[Boolean] 列上进行筛选