python - 如何将数据从 pandas 数据帧分块加载到 spark 数据帧

标签 python pandas apache-spark pyspark

我已经使用类似这样的方法通过 pyodbc 连接读取数据 block :

import pandas as pd
import pyodbc
conn = pyodbc.connect("Some connection Details")
sql = "SELECT * from TABLES;"
df1 = pd.read_sql(sql,conn,chunksize=10)

现在我想使用类似的方法将所有这些 block 读入一个单一的 spark 数据帧中:

i = 0
for chunk in df1:
    if i==0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2.unionAll(sqlContext.createDataFrame(chunk))
    i = i+1

问题是当我执行 df2.count() 时,我得到的结果是 10,这意味着只有 i=0 的情况有效。这是 unionAll 的错误吗?我在这里做错了什么吗??

最佳答案

.unionAll() 的文档声明它返回一个新的数据帧,因此您必须分配回 df2 DataFrame:

i = 0
for chunk in df1:
    if i==0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.unionAll(sqlContext.createDataFrame(chunk))
    i = i+1

此外,您还可以使用 enumerate()避免必须自己管理 i 变量:

for i,chunk in enumerate(df1):
    if i == 0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.unionAll(sqlContext.createDataFrame(chunk))

此外,.unionAll() 的文档指出 .unionAll() 已弃用,现在您应该使用 .union()它的作用类似于 SQL 中的 UNION ALL:

for i,chunk in enumerate(df1):
    if i == 0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.union(sqlContext.createDataFrame(chunk))

编辑:
此外,我将停止再说,但在我再说之前:正如@zero323 所说,我们不要在循环中使用 .union() 。让我们改为执行以下操作:

def unionAll(*dfs):
    ' by @zero323 from here: http://stackoverflow.com/a/33744540/42346 '
    first, *rest = dfs  # Python 3.x, for 2.x you'll have to unpack manually
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )

df_list = []
for chunk in df1:
    df_list.append(sqlContext.createDataFrame(chunk))

df_all = unionAll(df_list)

关于python - 如何将数据从 pandas 数据帧分块加载到 spark 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38679474/

相关文章:

python - 使用 ImageDataGenerator 和 flow_from_directory 时,Keras 中的数据增强是否应用于验证集

python - 如何显示包含列表的字典中的值作为django中的值

python - PySpark 对已排序的内容进行排序

java - Spark : how to write efficient sql query to achieve this goal

python - Django 中何时使用语言环境名称和语言代码?

python - 如何找到属于非方阵零空间的线性无关向量? (Python)

python - 如何构建可以处理非数字目标的用户友好的 sklearn 回归器?

python - 创建新行并根据时间间隔重复这些值(如果它们属于)

python - 在 matplotlib 中根据数值变量绘制分类变量

scala - 为什么在reduce中使用减法结果不一致?