apache-spark - 如何使用 spark 数据框评估 spark Dstream 对象

标签 apache-spark pyspark spark-streaming

我正在编写一个 spark 应用程序,我需要根据历史数据评估流数据,该数据位于 sql server 数据库中

现在的想法是,spark 将从数据库中获取历史数据并将其保存在内存中,并根据它评估流数据。

现在我正在获取流数据

import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext,functions as func,Row


sc = SparkContext("local[2]", "realtimeApp")
ssc = StreamingContext(sc,10)
files = ssc.textFileStream("hdfs://RealTimeInputFolder/")

########Lets get the data from the db which is relavant for streaming ###

driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
dataurl = "jdbc:sqlserver://myserver:1433"
db = "mydb"
table = "stream_helper"
credential = "my_credentials"

########basic data for evaluation purpose ########



files_count = files.flatMap(lambda file: file.split( ))

pattern =  '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\\S+ )(TranDescription=u.)([a-zA-z\\s]+)([\\S\\s]+ )(dSc=u.)([A-Z]{2}.[0-9]+)'


tranfiles = "wasb://myserver.blob.core.windows.net/RealTimeInputFolder01/"

def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
    return globals()['sqlContextSingletonInstance']


def pre_parse(logline):
    """
    to read files as rows of sql in pyspark streaming using the pattern . for use of logging 
    added 0,1 in case there is any failure in processing by this pattern

    """
    match = re.search(pattern,logline)
    if match is None:
        return(line,0)
    else:
        return(
        Row(
        customer_id = match.group(8)
        trantype = match.group(5)
        amount = float(match.group(2))
        ),1)


def parse():
    """
    actual processing is happening  here 
    """
    parsed_tran = ssc.textFileStream(tranfiles).map(preparse)
    success = parsed_tran.filter(lambda s: s[1] == 1).map(lambda x:x[0])
    fail = parsed_tran.filter(lambda s:s[1] == 0).map(lambda x:x[0])
    if fail.count() > 0:
        print "no of non parsed file : %d", % fail.count()

    return success,fail

success ,fail = parse()

现在我想通过我从历史数据中得到的数据框来评估它
base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()

现在,既然这是作为数据框返回的,我如何将其用于我的目的。
流媒体节目指南here
“您必须使用 StreamingContext 正在使用的 SparkContext 创建一个 SQLContext。”

现在这让我更加困惑如何将现有数据帧与流对象一起使用。任何帮助表示高度赞赏。

最佳答案

要操作 DataFrames,你总是需要一个 SQLContext 所以你可以实例化它:

sc = SparkContext("local[2]", "realtimeApp")
sqlc = SQLContext(sc)
ssc = StreamingContext(sc, 10)

这 2 个上下文( SQLContext StreamingContext )将共存于同一个作业中,因为它们与相同的 相关联SparkContext .
但是,请记住,您不能在同一个作业中实例化两个不同的 SparkContext。

从 DStream 创建 DataFrame 后,您可以将历史 DataFrame 与从流创建的 DataFrame 连接起来。
为此,我会做类似的事情:
yourDStream.foreachRDD(lambda rdd: sqlContext
    .createDataFrame(rdd)
    .join(historicalDF, ...)
    ...
)

考虑在操作流时需要用于连接的流数据量,您可能对 windowed functions 感兴趣。

关于apache-spark - 如何使用 spark 数据框评估 spark Dstream 对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37174347/

相关文章:

apache-spark-sql - 将宽数据帧转置为长数据帧

CSV 数据以奇怪的格式导出/复制到 HDFS

apache-spark - Spark 结构化流在追加模式下显示结果太迟

java - 跨不同 JVM 的相同 SparkContext

java - 尽管mapWithState中的元素相同,但为什么所有元素都被打印

apache-spark - 数据如何从 S3 存储桶传输到 Spark 工作线程

hadoop - 如何用Spark尾部HDFS文件?

python - Pandas UDF 函数内部无法识别的函数

python - 在 Apache Spark : py4j. 协议(protocol)中导入气泡水 (H2O) 管道时出错。Py4JError

apache-spark - 暂停/节流 Spark / Spark 流应用程序