scala - groupBy 的子流可以依赖于它们生成的键吗?

标签 scala akka-stream

我有一个与用户相关的数据流。我也有每个用户的状态,我可以从数据库异步获取。

我想将我的流与每个用户一个子流分开,并在实现子流时为每个用户加载状态,以便可以根据此状态处理子流的元素。

如果我不想合并下游的子流,我可以用 groupBy 做一些事情和 Sink.lazyInit :

def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...

val treatByUser: Sink[Element] = Flow[Element].groupBy(
  Int.MaxValue, 
  getUserId
).to(
  Sink.lazyInit(
    elt => getState(getUserId(elt)).map(treatUser),
    ??? // this is never called, since the subflow is created when an element comes
  )
)

但是,如果 treatUser,这不起作用变成 Flow ,因为 Sink.lazyInit 没有等价物.

由于 groupBy 的子流仅在推送新元素时才具体化,应该可以使用此元素来具体化子流,但我无法调整 groupBy 的源代码,以便此工作始终如一。同样,Sink.lazyInit似乎不容易翻译成 Flow案件。

关于如何解决这个问题的任何想法?

最佳答案

你要看的相关Akka issue是#20129: add Sink.dynamic and Flow.dynamic .

在相关的 PR #20579他们实际上实现了 LazySink东西。

他们正计划做LazyFlow下一个:

Will do next lazyFlow with similar signature.



不幸的是,您必须等待该功能在 Akka 中实现或自己编写(然后考虑对 Akka 进行 PR)。

关于scala - groupBy 的子流可以依赖于它们生成的键吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42862590/

相关文章:

Scala:具有两个或多个选项的映射

akka-stream - 在项目 react 堆或Akka流中,汇和订户之间的概念区别是什么?

scala - Keep组合的含义?

playframework - 基于 "resource"连接的客户端正在处理的 BroadcastHub 过滤?

performance - 为什么电梯框架这么慢?

Java/Scala 双向 MD5

scala - 如何使用Java而不是Java来学习如何使用scala。

java - HBase 获得多个不同的 rowid?

http - 在 akka http 流请求中使用 KillSwitch

apache-spark - 如何有效地从 Cassandra 读取数百万行?