python - 如何获得在 Spark 1.5.2 中使用 HiveContext 制作的 PySpark DataFrame?

标签 python apache-spark hive pyspark spark-dataframe

更新:看来我的错误可能是因为我安装 Spark 和/或 Hive 的方式。在 Databricks(托管)笔记本中使用窗口函数似乎非常简单。我需要弄清楚如何在本地进行设置。

我有一个 Spark DataFrame,需要在其上使用 Window 函数。* 我尝试按照 here 上的说明进行操作,但我遇到了一些问题。

设置我的环境:

import os
import sys
import datetime as dt

os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2'
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip'
sys.path.append('/usr/bin/spark-1.5.2/python')
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip')

import pyspark
sc = pyspark.SparkContext()
hiveContext = pyspark.sql.HiveContext(sc)
sqlContext = pyspark.sql.SQLContext(sc)
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict

设置我的数据:

test_ts = {'adminDistrict': None,
 'city': None,
 'country': {'code': 'NA', 'name': 'UNKNOWN'},
 'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89},
  {'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44},
  {'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3},
  {'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6},
  {'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84},
  {'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74},
  {'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33},
  {'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33},
  {'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5},
  {'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79},
  {'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3},
  {'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0},
  {'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35},
  {'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82},
  {'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24},
  {'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61},
  {'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14},
  {'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0},
  {'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82},
  {'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11},
  {'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46},
  {'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8},
  {'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74},
  {'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63},
  {'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64},
  {'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}],
 'maxDate': '2015-12-28T00:00:00Z',
 'minDate': '2005-08-25T00:00:00Z',
 'name': 'S&P GSCI Crude Oil Spot',
 'offset': 0,
 'resolution': 'DAY',
 'sources': ['trf'],
 'subtype': 'Index',
 'type': 'Commodities',
 'uid': 'TRF_INDEX_Z39824_PI'}

将 json 转换为 DataFrame 的函数:

def ts_to_df(ts):
    data = []
    for line in ts['data']:
        data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value']))
    return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')])

获取数据框并查看其中的内容:

test_df = ts_to_df(test_ts)
test_df.show()

这向我展示了这一点:

+----------+----------------------+
|      Date|SP_GSCI_Crude_Oil_Spot|
+----------+----------------------+
|2005-08-25|                369.89|
|2005-08-26|                362.44|
|2005-08-29|                 368.3|
|2005-08-30|                 382.6|
|2005-08-31|                377.84|
|2005-09-01|                380.74|
|2005-09-02|                370.33|
|2005-09-05|                370.33|
|2005-09-06|                 361.5|
|2005-09-07|                352.79|
|2005-09-08|                 354.3|
|2005-09-09|                 353.0|
|2005-09-12|                349.35|
|2005-09-13|                348.82|
|2005-09-14|                360.24|
|2005-09-15|                357.61|
|2005-09-16|                347.14|
|2005-09-19|                 370.0|
|2005-09-20|                362.82|
|2005-09-21|                366.11|
+----------+----------------------+

这是我不知道自己在做什么的地方,一切都开始出错了:

from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window

w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()

这给了我这个错误:

Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;

看来我需要一个 HiveContext,对吧?我是否需要使用 HiveContext 创建我的 DataFrame?然后让我尝试使用 HiveContext 显式创建一个 DataFrame:

def ts_to_hive_df(ts):
    data = []
    for line in ts['data']:
        data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
                 ts['name'].replace('&', '').replace(' ', '_'):line['value']})
    temp_rdd = sc.parallelize(data).map(lambda x: Row(**x))
    return hiveContext.createDataFrame(temp_rdd)

test_df = ts_to_hive_df(test_ts)
test_df.show()

但这给了我这个错误:

TypeError: 'JavaPackage' object is not callable

那么如何使用窗口函数呢?我是否需要使用 HiveContext 创建 DataFrame?如果是这样,那我该怎么做?谁能告诉我我做错了什么?

*我需要知道我的数据是否存在差距。我有“日期”列,对于每一行,按日期排序,我想知道下一行是什么,如果我缺少天数或数据不正确,那么我想使用该行最后一天的数据。如果您知道更好的方法,请告诉我。但我仍然想知道如何让这些 Window 函数正常工作。

最佳答案

这是一个较旧的问题,因此没有实际意义,因为您可能已经使用了新版本的 Spark。我自己正在运行 spark 2.0,所以这可能是作弊。

但是 fwiw:2 个可能的问题。在第一个示例中,我认为 .toDF() 可能默认为 SQLContext 因为你们都调用了。第二,你重构的时候,会不会是在函数内部调用hivecontext?

如果我重构您的第二个 ts_to_df 函数以在函数外部调用 hivecontext,一切都很好。

def ts_to_df(ts):
    data = []
    for line in ts['data']:
        data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
                 ts['name'].replace('&', '').replace(' ', '_'):line['value']})
    return data

data = ts_to_df(test_ts)
test_rdd = sc.parallelize(data).map(lambda x: Row(**x))
test_df = hiveContext.createDataFrame(test_rdd)

from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window

w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()

我得到输出

+----------+
| Next_Date|
+----------+
|2005-08-26|
|2005-08-29|
|2005-08-30|
|2005-08-31|
|2005-09-01|
|2005-09-02|
.....

关于python - 如何获得在 Spark 1.5.2 中使用 HiveContext 制作的 PySpark DataFrame?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34518342/

相关文章:

java - 尝试使用 Spark 数据集 (Java) 查找 2 个文件之间删除的记录时结果不一致

apache-spark - Spark MLlib - 使用隐式反馈训练协同过滤 - 奇怪的警告

hadoop - 使用 HIVE 减少映射

hadoop - 将配置单元表迁移到 Google BigQuery

python - 当我使用生成器表达式和迭代器从网格上的常规网格插入数据 >2000 次时,我收到 MemError

Python Matplotlib 样式文件 : show only horizontal gridlines

python - CouldntDecodeError : Decoding failed. ffmpeg 返回错误代码:69

python - 如何将字符串拆分为重复的子字符串

scala - 使用 Spark 选择特定列

java - 了解 Hive 故障背后的真正原因