python - 如何将参数传递给 ML Pipeline.fit 方法?

标签 python apache-spark pyspark apache-spark-mllib apache-spark-ml

我正在尝试使用

构建集群机制
  • Google Dataproc + Spark
  • 谷歌大查询
  • 使用 Spark ML KMeans+pipeline 创建作业

如下:


  1. 在 bigquery 中创建基于用户级别的特征表
    示例:特征表的样子

    userid |x1 |x2 |x3 |x4 |x5 |x6 |x7 |x8 |x9 |x10<br/> 00013 |0.01 | 0 |0 |0 |0 |0 |0 |0.06 |0.09 | 0.001

    1. 启动默认设置集群,我正在使用 gcloud 命令行界面创建集群并运行作业,如图所示 here
    2. 使用提供的入门代码,我读取了 BQ 表,将 RDD 转换为 Dataframe 并传递给 KMeans 模型/管道:
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
import json
import pprint
import subprocess
import pyspark
import numpy as np
from pyspark.ml.clustering import KMeans
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors, _convert_to_vector
from pyspark.sql.types import Row
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used by the InputFormat.
# This assumes the Google Cloud Storage connector for Hadoop is configured.

bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
 conf = {# Input Parameters
 'mapred.bq.project.id': project,
 'mapred.bq.gcs.bucket': bucket,
 'mapred.bq.temp.gcs.path': input_directory,
 'mapred.bq.input.project.id': 'my-project',
 'mapred.bq.input.dataset.id': 'tempData',
 'mapred.bq.input.table.id': 'userFeatureInBQ'}

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
 'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
 'org.apache.hadoop.io.LongWritable',
 'com.google.gson.JsonObject',conf=conf)

# Tranform the userid-Feature table into feature_data RDD
 feature_data = (
 table_data
  .map(lambda (_, record): json.loads(record))
  .map(lambda   x:(x['x0'],x['x1'],x['x2'],x['x3'],x['x4'],
                  x['x5'],x['x6'],x['x7'],x['x8'],
                  x['x9'],x['x10'])))

# Function to convert each line in RDD into an array, return the vector
  def parseVector(values):
     array = np.array([float(v) for v in values])
     return _convert_to_vector(array)

# Convert the RDD into a row wise RDD
  data = feature_data.map(parseVector)
  row_rdd = data.map(lambda x: Row(x))

sqlContext = SQLContext(sc)

# cache the RDD to improve performance
row_rdd.cache()

# Create a Dataframe
df = sqlContext.createDataFrame(row_rdd, ["features"])

# cache the Dataframe
df.cache()

这是我打印到控制台的架构和 head():

|-- features: vector (nullable = true)
[Row(features=DenseVector([0.01,0,0,0,0,0,0,0.06,0.09,0.001]))]

  1. 按以下方式运行聚类 KMeans 算法
    • 多次运行模型
    • 使用不同的参数(即更改#clusters 和 init_mode)
    • 计算错误或成本指标
    • 选择最佳模型参数组合
    • 使用 KMeans 作为估算器创建管道
    • 使用paramMap传递多个参数
#Define the paramMap & model
paramMap = ({'k':3,'initMode':'kmeans||'},{'k':3,'initMode':'random'},
  {'k':4,'initMode':'kmeans||'},{'k':4,'initMode':'random'},
  {'k':5,'initMode':'kmeans||'},{'k':5,'initMode':'random'},
  {'k':6,'initMode':'kmeans||'},{'k':6,'initMode':'random'},
  {'k':7,'initMode':'kmeans||'},{'k':7,'initMode':'random'},
  {'k':8,'initMode':'kmeans||'},{'k':8,'initMode':'random'},
  {'k':9,'initMode':'kmeans||'},{'k':9,'initMode':'random'},
  {'k':10,'initMode':'kmeans||'},{'k':10,'initMode':'random'})

 km = KMeans()

 # Create a Pipeline with estimator stage
 pipeline = Pipeline(stages=[km])

 # Call & fit the pipeline with the paramMap
 models = pipeline.fit(df, paramMap)`
 print models

我得到以下带有警告的输出

7:03:24 WARN org.apache.spark.mllib.clustering.KMeans: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached. [PipelineModel_443dbf939b7bd3bf7bfc, PipelineModel_4b64bb761f4efe51da50, PipelineModel_4f858411ac19beacc1a4, PipelineModel_4f58b894f1d14d79b936, PipelineModel_4b8194f7a5e6be6eaf33, PipelineModel_4fc5b6370bff1b4d7dba, PipelineModel_43e0a196f16cfd3dae57, PipelineModel_47318a54000b6826b20e, PipelineModel_411bbe1c32db6bf0a92b, PipelineModel_421ea1364d8c4c9968c8, PipelineModel_4acf9cdbfda184b00328, PipelineModel_42d1a0c61c5e45cdb3cd, PipelineModel_4f0db3c394bcc2bb9352, PipelineModel_441697f2748328de251c, PipelineModel_4a64ae517d270a1e0d5a, PipelineModel_4372bc8db92b184c05b0]


#Print the cluster centers:
for model in models:
    print vars(model)
    print model.stages[0].clusterCenters()
    print model.extractParamMap()

输出: [array([7.64676638e-07, 3.58531391e-01, 1.68879698e-03, 0.00000000e+00, 1.53477043e-02, 1.25822915e-02, 0.00000000e+00, 6.93060772e-07, 1.41766847e-03, 1.60941306e-02], array([2.36494105e-06, 1.87719732e-02, 3.73829379e-03, 0.00000000e+00, 4.20724542e-02, 2.28675684e-02, 0.00000000e+00, 5.45002249e-06, 1.17331153e-02, 1.24364600e-02])


这里是问题列表,需要帮助:

  • 我得到一个列表,其中只有 2 个聚类中心作为所有模型的数组,
    • 当我尝试访问管道时,KMeans 模型似乎默认为 k=2?为什么会这样?
    • 最后一个循环应该访问 pipelineModel 和第 0 阶段并运行 clusterCenter() 方法吗?这是正确的方法吗?
    • 为什么我会收到数据未缓存的错误消息?
  • 在使用管道时,我找不到如何计算 WSSSE 或任何等效方法,如 .computeCost()(对于 mllib)?如何比较基于不同参数的不同模型?
  • 我尝试了以下代码来运行源代码中定义的 .computeCost 方法 here :
    • 这违背了使用管道并行运行 KMeans 模型和模型选择的目的,但是我尝试了以下代码:
#computeError
def computeCost(model, rdd):`
"""Return the K-means cost (sum of squared distances of
 points to their nearest center) for this model on the given data."""
    cost = callMLlibFunc("computeCostKmeansModel",
                          rdd.map(_convert_to_vector),
               [_convert_to_vector(c) for c in model.clusterCenters()])
    return cost

cost= np.zeros(len(paramMap))

for i in range(len(paramMap)):
    cost[i] = cost[i] + computeCost(model[i].stages[0], feature_data)
print cost

这会在循环结束时打印出以下内容:

[ 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687]

  • 每个模型计算的成本/误差是否相同?再次无法使用正确的参数访问 pipelineModel。

非常感谢任何帮助/指导!谢谢!

最佳答案

您的参数未正确定义。它应该从特定参数映射到值,而不是从任意名称映射。您得到的 k 等于 2,因为您传递的参数未被使用,并且每个模型都使用完全相同的默认参数。

让我们从示例数据开始:

import numpy as np
from pyspark.mllib.linalg import Vector

df = (sc.textFile("data/mllib/kmeans_data.txt")
  .map(lambda s: Vectors.dense(np.fromstring(s, dtype=np.float64, sep=" ")))
  .zipWithIndex()
  .toDF(["features", "id"]))

和一个管道:

from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

km = KMeans()

pipeline = Pipeline(stages=[km])

如上所述参数映射应该使用特定的参数作为键。例如:

params = [
    {km.k: 2, km.initMode: "k-means||"},
    {km.k: 3, km.initMode: "k-means||"},
    {km.k: 4, km.initMode: "k-means||"}
]

models = pipeline.fit(df, params=params)

assert [len(m.stages[0].clusterCenters()) for m in models] == [2, 3, 4]

注意事项:

  • 为 K 均值正确的 initMode||是 k-means|| 不是 kmeans||
  • 在流水线中使用参数映射并不意味着模型是并行训练的。 Spark 通过数据而非参数并行化训练过程。它只不过是一种方便的方法。
  • 您收到有关未缓存数据的警告,因为 K-Means 的实际输入不是 DataFrame,而是转换后的 RDD。

关于python - 如何将参数传递给 ML Pipeline.fit 方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35253990/

相关文章:

apache-spark - Pyspark:拯救变压器

python - Python 中带键的 sort_values()

python - 在 python 中矢量化 for 循环

python - 在 Django 中,如何基于多对多关系中的所有实体而不是任何实体进行过滤?

python - Apache Spark : Job aborted due to stage failure: "TID x failed for unknown reasons"

scala - Spark Scala数据框udf返回行

apache-spark - Spark Parquet 分区 : How to choose a key

python - 动态更改 Python 类属性

apache-spark - 为什么 sortBy 转换会触发 Spark 作业?

pyspark - 需要 Hive 支持才能创建 Hive TABLE(AS SELECT)