scala - Spark结构化流-处理每一行

标签 scala apache-spark apache-kafka spark-structured-streaming

我正在Spark 2.1.1中使用结构化流。我需要对传入消息(从Kafka来源)应用一些业务逻辑。

本质上,我需要获取消息,获取一些键值,在HBase中查找它们并在数据集上执行更多biz逻辑。最终结果是一个字符串消息,需要将其写出到另一个Kafka队列。

但是,由于传入消息的抽象是一个数据帧(无边界表-结构化流),因此我必须遍历触发期间通过mapPartitions接收的数据集(由于HBase客户端无法序列化而导致的分区)。

在我的流程中,我需要遍历每一行以执行相同的业务流程。


有没有更好的方法可以帮助我避免dataFrame.mapPartitions调用?我觉得它的顺序和迭代!
结构化流基本上会迫使我从业务流程中生成输出数据帧,而没有一开始。我可以使用其他哪些设计模式来实现最终目标?


您会推荐一种替代方法吗?

最佳答案

当谈到在Spark中使用Dataframes时,从广义上讲,您可以做以下三件事之一
a)生成一个数据框
b)转换数据帧
c)消耗数据帧

在结构化流中,使用数据源生成流DataFrame。通常,您使用公开的sparkSession.readStream方法创建源。此方法返回一个DataStreamReader,它具有几种从各种输入中读取的方法。所有这些都返回一个DataFrame。它在内部创建一个数据源。 Spark允许您实现自己的DataSource,但他们建议您反对它,因为从2.2版本开始,该接口被认为是实验性的

您主要使用map或reduce或spark SQL来转换数据框。地图有不同的样式(map,mapPartition,mapParititionWithIndex),等等。它们基本上都占用一行并返回一行。在内部,Spark会执行并行化对map方法的调用的工作。它对数据进行分区,将其散布在集群上的执行器上,然后在执行器中调用map方法。您无需担心并行性。它内置在引擎盖下。 mapParitions不是“顺序的”。是的,一个分区中的行是顺序执行的,但是多个分区是并行执行的。您可以通过对数据帧进行分区来轻松控制并行度。您有5个分区,您将有5个并行运行的进程。您有200个,如果有200个核心,则可以并行运行200个

请注意,没有什么可以阻止您进入管理转换内部状态的外部系统。但是,您的变换应该是幂等的。给定一组输入,它们应始终生成相同的输出,并随着时间的推移使系统保持相同的状态。如果要与转换中的外部系统对话,则可能会很困难。结构化流至少提供一次保证。意味着同一行可能会多次转换。因此,如果您正在执行向银行帐户添加资金之类的操作,则可能会发现您向某些帐户添加了相同金额的资金两次。

数据被接收器占用。通常,您可以通过在Dataframe上调用format方法,然后调用start来添加接收器。 StructuredStreaming有一些内置的接收器(除了一个)几乎没有用。您可以创建自定义接收器,但由于界面是实验性的,因此不建议再次使用。唯一有用的接收器是您将实现的对象。它称为ForEachSink。 Spark将为每个接收器调用您的分区中所有行。您可以对行执行任何操作,包括将其写入Hbase。请注意,由于结构化流的至少一次性质,同一行可能多次被馈送到您的ForEachSink。您应该以幂等的方式实现它。同样,如果您有多个接收器,则数据将并行写入接收器。您无法控制接收器的调用顺序。可能发生一个接收器从一个微型批次获取数据而另一个接收器仍在处理前一个微型批次的数据的情况。从本质上讲,接收器最终是一致的,而不是立即一致的。

通常,构建代码的最干净的方法是避免进入转换内部的外部系统。您的转换应纯粹转换数据帧中的数据。如果要从HBase获得数据,请将其放入数据框,将其与流数据框合并,然后进行转换。这是因为当您使用外部系统时,很难进行扩展。您想通过增加数据帧上的分区并添加节点来扩展转换。但是,与外部系统通信的节点过多会增加外部系统的负载并导致瓶颈,将转换与数据检索分开可以使您独立地扩展它们。

但!!!!这里有大屁股……

1)当您谈论结构化流传输时,无法实现可根据输入中的数据有选择地从HBase中获取数据的Source。您必须在类似map(-)的方法中执行此操作。因此,IMO,如果Hbase中的数据发生更改或者您不想保留很多数据,那么您拥有的一切就很好。如果您在HBase中的数据很小且没有变化,那么最好将其读取到批处理数据帧中,对其进行缓存,然后将其与流数据帧结合在一起。 Spark会将所有数据加载到其自己的内存/磁盘存储中,并保存在那里。如果您的数据很小并且经常更改,那么最好将其读取到数据帧中,不要将其缓存并与流数据帧一起加入。每次运行微型批处理时,Spark都会从HBase加载数据。

2)无法下令执行两个单独的接收器。因此,如果您的要求要求您写入数据库并写入Kafka,并且您想保证在数据库中提交该行之后才在Kafka中写入一行,那么唯一的方法是
a)都在For每个接收器中写入。
b)以类似地图的功能写入一个系统,并为每个接收器写入a

不幸的是,如果您有一个要求,要求您从流式源中读取数据,将其与批处理源中的数据合并,对其进行转换,将其写入数据库,调用一个API,从该API获取结果并写入Kafka的API,并且这些操作必须按照正确的顺序进行,那么唯一的方法是在转换组件中实现接收器逻辑。您必须确保在单独的映射函数中将逻辑分开,以便可以最佳方式并行化它们。

而且,没有很好的方法知道应用程序何时完全处理了微批处理,尤其是当您有多个接收器时

关于scala - Spark结构化流-处理每一行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44187297/

相关文章:

java - Scala 2.9 桥接方法

apache-kafka - 卡夫卡流并发?

scala - 将 Doobie 流从数据库保存到文件

java - Play 框架是否附带电子邮件模板库或工具?

java - spark checkpoint ".bk"和 ".crc"文件的功能是什么?

apache-spark - 评估 Spark 广播变量的最大大小

apache-spark - 获取 : Error importing Spark Modules : No module named 'pyspark.streaming.kafka'

apache-kafka - 分布式Kafka Connect主题配置

scala - scala StrictLogging和Lazylogging有什么区别?

apache-spark - Hive with Hadoop vs Hive with spark vs Spark sql vs HDFS-它们如何相互配合?