scala - 如何使用异步阶段创建 Akka 流真正的拉流

标签 scala akka akka-stream

我正在尝试创建一个提供 OAuth2 token 的源,并且还负责刷新过期的 token 。 目前我的代码看起来有点像这样

  case class Token(expires: Instant = Instant.now().plus(100, ChronoUnit.MILLIS)){
    def expired = Instant.now().isAfter(expires)
  }

  Source
    .repeat()
    .mapAsync(1){ _ =>
      println("  -> token req")
      // this fakes an async token request to the token service
      Future{
        Thread.sleep(500)
        println("  <- token resp")
        Token()
      }
    }
    .mapAsync(1){ token =>
      println("  -> req with token auth")
      if(token.expired){
        println("!!! Received expired token")
      }
      // this is the actual call that needs the token
      println("making call")
      Future{
        Thread.sleep(2000)
        println("  <- req resp")
        "OK"
      }
    }
    .take(2)
    .runWith(Sink.ignore)
    .recover{case e => ()}
    .flatMap{ _ =>
      system.terminate()
    }

此代码的输出如下所示

root   -> token req
root   <- token resp
root   -> token req
root   -> req with token auth
root making call
root   <- token resp
root   -> token req
root   <- token resp
root   -> token req
root   <- token resp
root   -> token req
root   <- req resp
root   -> req with token auth
root !!! Received expired token
root making call
root   <- token resp
root   -> token req
root   <- token resp
root   -> token req
root   <- token resp
root   <- req resp
root   -> req with token auth
root !!! Received expired token
root making call
root ... finished with exit code 0

显然,这个mapAsync(1)在不符合预期的情况下产生了需求(预取?)

有 2 个问题:

  • 需求导致上游出现不需要的 token 请求
  • token 的预取/缓存存在问题,因为它们仅在特定时间内有效

那么我如何创建一个像这个函数一样的真正的拉流呢?

def tokenSource: () => Future[Token]

最佳答案

如果您有意避免预取和排队,那么我认为 scala.collection.immutable.Stream ,或 Iterator,是比 akka Stream 更好的解决方案。

下面是一个示例实现,它避免了您在问题中列举的陷阱。 (注意:我使用 ActorSystem 通过 dispatcher 创建 ExecutionContext,以防止应用程序在 sleep 调用有时间完成之前退出。我正在采取优点是 ActorSystem 不会仅仅因为主函数到达表达式定义的末尾而关闭。)

import scala.collection.immutable.Stream
import scala.concurrent.Future

object ScalaStreamTest extends App {    
  case class Token(expires: Long = System.currentTimeMillis() + 100){
    def expired = System.currentTimeMillis() > expires
  }

  val actorSystem = akka.actor.ActorSystem()      
  import actorSystem.dispatcher

  def createToken =  Future {
    Thread.sleep(500)
    println("  <- token resp")
    Token()
  }

  def checkExpiration(token : Future[Token]) = token map { t =>
    println("  -> req with token auth")
    if(t.expired){println("!!! Received expired token")}
    t
  }

  def makeCall(token : Future[Token]) = token flatMap { t =>
    println("making call")
    Future {
      Thread.sleep(2000)
      println("  <- req resp")
      "OK"
    }
  }

  val stream = Stream.continually(createToken)
                     .map(checkExpiration)
                     .map(makeCall)
                     .take(2)
                     .force
}//end object ScalaStreamTest

force 调用是必要的,因为 Stream 是惰性的,因此在强制之前的所有方法调用(即: Continuous、map 和 take)也是惰性的。除非调用 reducer 或通过强制显式告知 Stream,否则惰性 Stream 上不会发生任何计算。

关于scala - 如何使用异步阶段创建 Akka 流真正的拉流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32735162/

相关文章:

playframework - 在游戏中使用 Actor

scala - 在Akka中是否必须拥有大师级 Actor ?

scala - Play Framework 2.6 Alpakka S3 文件上传

playframework - 基于 "resource"连接的客户端正在处理的 BroadcastHub 过滤?

scala - 如何根据类名创建 Akka Actor

algorithm - 实现尾递归列表操作

scala - 如何在 akka actor 中测试公共(public)方法?

scala - 在 AWS EMR SDK 中使用 AddJobFlowStep 的正确方法是什么?

java - 使用 ThreadLocals 的 Akka 和 Java 库

java - MongoSink 响应后提交给 kafka 消费者 - alpakka mongo 连接器