scala - 使用 Scala + Slick + MySQL + Akka + Stream 面临的问题

标签 scala akka slick akka-stream

问题陈述 :我们正在将 MySQL DB 表中特定模块的用户的所有传入请求参数添加为一行(这是一个巨大的数据)。现在,我们要设计一个进程,从该表中读取每条记录,并通过调用第三方 API 获取有关用户请求的更多信息,然后将返回的元信息放入另一个表中。

当前尝试 :

我正在使用 Scala + Slick 来做到这一点。由于要读取的数据很大,我想一次读取该表一行并进行处理。我尝试使用 slick + akka 流,但是我得到 'java.util.concurrent.RejectedExecutionException'

以下是我尝试过的粗略逻辑,

implicit val system = ActorSystem("Example")
import system.dispatcher
implicit val materializer = ActorMaterializer()

val future = db.stream(SomeQuery.result)
Source.fromPublisher(future).map(row => {
        id = dataEnrichmentAPI.process(row)

}).runForeach(id => println("Processed row : "+ id))

dataEnrichmentAPI.process :此函数进行第三方 REST 调用,并进行一些数据库查询以获取所需的数据。这个 DB 查询是使用 'db.run' 方法完成的,它也会等到它完成(使用等待)

例如。,
def process(row: RequestRecord): Int = {
   // SomeQuery2 = Check if data is already there in DB
   val retId: Seq[Int] = Await.result(db.run(SomeQuery2.result), Duration.Inf)
   if(retId.isEmpty){
         val metaData = RestCall()
         // SomeQuery3 = Store this metaData in DB
         Await.result(db.run(SomeQuery3.result), Duration.Inf)
         return metaData.id;      
   }else{
       // SomeQuery4 = Get meta data id 
      return Await.result(db.run(SomeQuery4.result), Duration.Inf)     
   }
 }

我在使用对 DB 的阻塞调用时遇到了这个异常。我不认为我是否可以摆脱它,因为以后的流程需要返回值才能继续。

“阻塞调用”是否是此异常背后的原因?
解决此类问题的最佳做法是什么?

谢谢。

最佳答案

我不知道这是否是您的问题(细节太少),但您永远不应该阻止。

说到最佳实践,我们 async stages反而。
这或多或少是您的代码在不使用 Await.result 的情况下的样子:

def process(row: RequestRecord): Future[Int] = {
   db.run(SomeQuery2.result) flatMap { 
      case retId if retId.isEmpty =>
        // what is this? is it a sync call? if it's a rest call it should return a future
        val metaData = RestCall() 
        db.run(SomeQuery3.result).map(_ => metaData.id)

      case _ => db.run(SomeQuery4.result)
   }
 }


Source.fromPublisher(db.stream(SomeQuery.result))
  // choose your own parallelism
  .mapAsync(2)(dataEnrichmentAPI.process)
  .runForeach(id => println("Processed row : "+ id))

这样,您将明确且惯用地处理背压和并行性。

尽量不要在生产代码中调用 Await.result 并且只调用 compose futures using map, flatMap and for comprehensions

关于scala - 使用 Scala + Slick + MySQL + Akka + Stream 面临的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36357565/

相关文章:

java - akka 中的正确设计。 - 消息传递

scala - Akka Actor 的状态未得到正确监控

akka - Akka.NET 和原始 Akka 可以使用 Remoting 进行通信吗?

scala - 根据 Slick 中的 Id 选择单行

scala - 如何在光滑中保留枚举值

scala - SBT项目编译问题

Scala:方便绑定(bind)集合类型参数

android - Scala 中的大量 Android 开发

java - Akka Actors unstashAll() 与 Java 中的谓词

scala - 如何通过 Play 2/TypeSafe Activator 使用 Slick 连接到 Oracle 11g 数据库?