string - PySpark 上分类输入的随机森林回归

标签 string machine-learning pyspark one-hot-encoding

我一直在尝试在 PySpark 上做一个简单的随机森林回归模型。我在 R 上有很好的机器学习经验。然而,对我来说,Pyspark 上的 ML 似乎完全不同 - 特别是在处理分类变量、字符串索引和 OneHotEncoding 时(当只有数字变量时,我能够只需通过以下示例即可执行 RF 回归)。虽然有很多可用于处理分类变量的示例,例如 thisthis ,我对其中任何一个都没有成功,因为其中大多数都超出了我的理解范围(可能是因为我对 Python ML 不熟悉)。我将感谢任何可以帮助解决此问题的人。

这是我的尝试:inputfile is here

from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import Row
from pyspark.sql.functions import col, round
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv')
train.cache()
train.dtypes

输出为:

DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double]

接下来我选择感兴趣的变量:

IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
train = train.fillna("XXX")
train = train.select([column for column in train.columns if column in IMP])
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))
train.cache()

输出是:

DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double]

我的因变量是ConversionPayOut,之前的字符串类型现在转换为 double 类型。

从这里开始我的困惑: 基于this post ,我知道我必须将分类字符串类型变量转换为 onehot 编码向量。这是我的尝试:

首先是字符串索引:

`

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ]
pipeline = Pipeline(stages=indexers)
train_catind = pipeline.fit(train).transform(train)
train_catind.show()

`

字符串索引的输出:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|Country|Carrier|TrafficType| Device|       Browser|     OS|  ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|     TH|   20.0|          A|   Lava|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         7.0|
|     BR|  217.0|          A|     LG|        chrome|Android|        26.2680574|  0.0|              0.0|          2.0|          0.0|     0.0|         5.0|
|     TH|   20.0|          A|Generic|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         0.0|`


Next, I think, I have to do the OneHOtEncoding of the String Indexes:

`

from pyspark.ml.feature import OneHotEncoder, StringIndexer
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ]
pipeline = Pipeline(stages=indexers_ON)
train_OHE = pipeline.fit(train_catind).transform(train_catind)
train_OHE.show()

`

one-hot 编码后的输出如下所示:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|Country|Carrier|TrafficType| Device|       Browser|     OS|  ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|     TH|   20.0|          A|   Lava|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         7.0|        (1,[0],[1.0])|    (9,[1],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[7],[1.0])|
|     BR|  217.0|          A|     LG|        chrome|Android|        26.2680574|  0.0|              0.0|          2.0|          0.0|     0.0|         5.0|        (1,[0],[1.0])|    (9,[2],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[5],[1.0])|
|     TH|   20.0|          A|Generic|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         0.0|        (1,[0],[1.0])|    (9,[1],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[0],[1.0])|

`

我不知道如何继续前进。事实上,我不知道哪些 Spark 机器学习包需要我们进行这种 one-hot 编码,哪些不需要。

如果 StackOverflow 社区能够阐明如何继续前进,那么对于 PySpark 的所有新手来说,这将是一次很好的学习。

最佳答案

要对预处理的数据运行随机森林,您可以继续使用以下代码。

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

#use VectorAssembler to combine all the feature columns into a single vector column
assemblerInputs = ["Carrier","Fraud","Country_index_Vec","TrafficType_index_Vec","Device_index_Vec","Browser_index_Vec","OS_index_Vec"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
pipeline = Pipeline(stages=assembler)
df = pipeline.fit(train_OHE).transform(train_OHE)
df = df.withColumn("label", train_OHE.ConversionPayOut)

#randomly split data into training and test dataset
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed = 111)

# train RandomForest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rf_model = rf.fit(train_data)

# Make predictions on test data
predictions = rf_model.transform(test_data)


希望这有帮助!

关于string - PySpark 上分类输入的随机森林回归,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46372562/

相关文章:

c++ - std::string const& 参数是否复制传递的 const char* ?

machine-learning - 二进制分类: what if the class I care most is only small portion of the data?

python - 将 pandas dataframe 转换为 PySpark RDD 时遇到问题?

python - 来自 Hive 查询的持久 PySpark Dataframe

machine-learning - 马尔可夫网络的对数似然

python - 如何使用 "Trigger once"触发器控制 Spark Structured Streaming 中每个触发器处理的文件数量?

c - C中十六进制字符串到int的转换

java - 将收到的消息分成两部分

string - 提取给定字符串之间的子字符串

python - 无法在 Tensorflow 中优化多元线性回归