python - 类型错误 : 'GroupedData' object is not iterable in pyspark

标签 python pyspark

我正在使用 Spark 版本 2.0.1 和 python 2.7。我正在运行以下代码

# This will return a new DF with all the columns + id
data1 = data.withColumn("id", monotonically_increasing_id()) # Create an integer index
data1.show()

def create_indexes(df,
                   fields=['country', 'state_id', 'airport', 'airport_id']):
    """ Create indexes for the different element ids
        for CMRs. This allows us to select CMRs that match
        a given element and element value very quickly.
    """
    if fields == None:
        print("No fields specified, returning")
        return
    for field in fields:
        if field not in df.columns:
            print('field: ', field, " is not in the data...")
            return
    indexes = {}
    for field in fields:
        print(field)
        res = df.groupby(field)
        index = {label: np.array(vals['id'], np.int32) for label, vals in res}
        indexes[field] = index
    return indexes

# Create indexes. Some of them take a lot of time!
#Changed dom_client_id by gbl_buy_grp_id as it was changed in Line Number 
indexes = create_indexes(data1, fields=['country', 'state_id', 'airport', 'airport_id'])
print type(indexes)

运行此代码时收到以下错误消息

TypeError: 'GroupedData' object is not iterable

你能帮我解决这个问题吗?

最佳答案

您必须对 GroupedData 执行聚合并在迭代结果之前收集结果,例如计算每组的项目数:res = df.groupby(field).count().collect()

关于python - 类型错误 : 'GroupedData' object is not iterable in pyspark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46791254/

相关文章:

python - 基于 pyspark 数据帧创建动态 casewhen 语句

python - 将常量列表添加到 pandas 列

Python:需要附加额外的 header ,在 urllib2 添加隐藏默认值之后,在发送请求之前

python - PyCharm 运行 Python 文件总是打开一个新的控制台

python - 如何在pyspark中保存没有日期的时间信息?

python - Pyspark 中的 Ngram 频率排名

azure - Pyspark - 数据透视表

python - 如何在 Python 中使用 XSLT 转换 XML 文件?

python - 有没有办法使用 MLflow 记录数据集的描述性统计信息?

python - CountVectorizer,第二次使用相同的词汇