apache-spark - PySpark::FP-growth 算法(引发 ValueError ("Params must be either a param map or a list/tuple of param maps, ")

标签 apache-spark pyspark fpgrowth

我是 PySpark 的初学者。我在 PySpark 中使用 FPgrowth 计算关联。我按照以下步骤操作。

数据示例

from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

# make some test data
columns = ['customer_id', 'product_id']
vals = [
     (370, 154),
     (41, 40),
     (109, 173),
     (18, 55),
     (105, 126),
     (370, 121),
     (41, 32323),
     (109, 22),
     (18, 55),
     (105, 133),
     (109, 22),
     (18, 55),
     (105, 133)
]

df = spark.createDataFrame(vals, columns)

df.show()
+-----------+----------+
|customer_id|product_id|
+-----------+----------+
|        370|       154|
|         41|        40|
|        109|       173|
|         18|        55|
|        105|       126|
|        370|       121|
|         41|     32323|
|        109|        22|
|         18|        55|
|        105|       133|
|        109|        22|
|         18|        55|
|        105|       133|
+-----------+----------+

### Prepare input data
from pyspark.sql.functions import collect_list, col

transactions = df.groupBy("customer_id")\
      .agg(collect_list("product_id").alias("product_ids"))\
      .rdd\
      .map(lambda x: (x.customer_id, x.product_ids))

transactions.collect()
[(370, [121, 154]),
 (41, [32323, 40]),
 (105, [133, 133, 126]),
 (18, [55, 55, 55]),
 (109, [22, 173, 22])]

## Convert .rdd to spark dataframe 
df2 = spark.createDataFrame(transactions)
df2.show()
+---+---------------+
| _1|             _2|
+---+---------------+
|370|     [121, 154]|
| 41|    [32323, 40]|
|105|[126, 133, 133]|
| 18|   [55, 55, 55]|
|109|  [22, 173, 22]|
+---+---------------+

df3 = df2.selectExpr("_1 as customer_id", "_2 as product_id")
df3.show()
df3.printSchema()
+-----------+---------------+
|customer_id|     product_id|
+-----------+---------------+
|        370|     [154, 121]|
|         41|    [32323, 40]|
|        105|[126, 133, 133]|
|         18|   [55, 55, 55]|
|        109|  [173, 22, 22]|
+-----------+---------------+

root
 |-- customer_id: long (nullable = true)
 |-- product_id: array (nullable = true)
 |    |-- element: long (containsNull = true)

 ## FPGrowth Model Building
 from pyspark.ml.fpm import FPGrowth
 fpGrowth = FPGrowth(itemsCol="product_id", minSupport=0.5, minConfidence=0.6)
 model = fpGrowth.fit(df3)

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-12-aa1f71745240> in <module>()
----> 1 model = fpGrowth.fit(df3)

/usr/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
     62                 return self.copy(params)._fit(dataset)
     63             else:
---> 64                 return self._fit(dataset)
     65         else:
     66             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/usr/lib/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    263 
    264     def _fit(self, dataset):
--> 265         java_model = self._fit_java(dataset)
    266         return self._create_model(java_model)
    267 

/usr/lib/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    260         """
    261         self._transfer_params_to_java()
--> 262         return self._java_obj.fit(dataset._jdf)
    263 
    264     def _fit(self, dataset):

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

我抬起头,但我不明白出了什么问题。也许我可以指出的唯一一件事是我将 RDD 转换为数据帧。

有人能指出我做错了什么吗?

最佳答案

如果您仔细检查回溯,您将看到问题的根源:

Caused by: org.apache.spark.SparkException: Items in a transaction must be unique but got ....

替换 collect_listcollect_set问题将得到解决。

关于apache-spark - PySpark::FP-growth 算法(引发 ValueError ("Params must be either a param map or a list/tuple of param maps, "),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51325092/

相关文章:

apache-spark - 在 pyspark 中,将一个数据帧中的多列连接到另一个数据帧中的单列,无需多次连接操作

python - 如何在 DataFrame 中将 Column 声明为分类特征以在 ml 中使用

java - 根据 DataStax Enterprise 的运行时类路径构建 Spark 应用程序

java - Spark App 参数编码

streaming - 无法使用 Spark Streaming 处理特定数量的行

python - 使用 PySpark 从 Azure 帐户存储中列出按年/月/日分区的 json 文件

apache-spark - 添加条件过滤器子句