pyspark入门(1)
- 导入pyspark相关的包
2.初始化pyspark
- 相关性分析以及数据预处理
- MLib中的决策树模型分析
- 易于理解、可读性强:能直接展示特征选取和样本预测模型的中间过程。
- 数据要求不高:决策树不仅对数据类型【离散型或者连续型】的要求不高,也不要求对数据进行标准化。
- 可以通过剪枝或者限制深度的方式提高预测精度,也能作为弱分类器集成为强分类器(比如随机森林)
- 决策树是预测模型,将观测特征值与类别标签建立映射关系。模型采用自上而下的方式生成,数据源以Gini指数或者信息增益等统计方法为依据,被分割为子数据集数据或者因预定的停止标准而终止分支。
pySpark实现决策树分类,同时采用分层抽样的方法处理数据偏斜问题、利用k折交叉验证的方法进行了模型参数决策树深度的优化,从而获得了最佳决策树。
随机森林是由多个决策树构成的森林,算法分类结果由这些决策树投票得到,决策树在生成过程中分别在行方向和列方向上添加随机过程,行方向上构建决策树 时采用有放回抽样(bootstrapping)得到训练数据,列方向上采用无放回随机抽样得到特征子集,并据此得到其 最优切分点。从图3中可以看到,通过K次训练,得到K棵不同的决策树{T1,T2,…,TK},再将这些树组合成一个分类模型系统,随机森林是一个组合模型,内部仍然是基于决策树,同单一的决策树分类不同的是,随机森林通过多个决策树投票结果进行分类,算法不容易出现过度拟合问题。
为了进一步提前锁定离网倾向用户,经过历史数据的比对,结合用户使用行为的分析,决定将过缴费期10天未缴费的用户定义为流失用户。根据传统数据挖掘实现的离网预测案例的经验,考虑到大数据系统的处理能力,通过对连续3个月内离网的用户进行离网打标,增加离网用户的样本量,提高离网预测的准确率;通过对目标用户中隔月后离网的用户进行打标,预留1个月的 预测结果干预期,进行维系挽留。如图4所示,采用连续7个月的历史数据,对第N-6月的数据进行隔月后的连 续3个月(N-4月、N-3月、N-2月)离网用户打标,取N-6 月、N-5月、N-4月连续3个月的正负样本并集,解决了传统打标负样本量不足和维系干预期太短等问题。
3.3.2 数据特征提取
根据业务经验,选取与用户流失可能存在相关性的所有属性,进行数据审查,筛选存在相关性较大的特 征属性。本次建模数据特征主要采用B域用户通信及消 费行为等基本属性、衍生属性(汇总、比例、趋势和波动)、挖掘属性等,增加O域样本数据,如上网行为、 终端属性指标(换机、应用偏好、掉话率、上网协议响 应成功率等)。如表1所示,数据维度包括基础信息维度、通信行为信息、账务信息、消费行为变化维度、交往圈信息、呼叫异网维度、投诉维度、通信行为维度及上网轨迹、掉话率等。根据这些维度数据合并汇总成数据挖掘特征宽表,用于模型训练和验证。
3.4 建立模型
流失客户预测模型的建立,具体包括原始数据处理、特征宽表构建、模型训练、模型评估和模型调优五个部分。如图5所示,智慧运营平台通过连接全网数 据的接口,获取建模所需的BSS系统(业务支持系统)数 据和OSS系统(运营支持系统)数据。BSS系统是运营商 向用户开展业务的主要IT组成部分,OSS系统是电信服务提供商用来管理通信网络的主要系统。BSS数据包括 CRM(客户关系)、Billing(账单数据)、详单数据及投诉数据,OSS数据包括分组交换数据(Package Switch, PS)、测量报告数据(Measurement Report,MR)和电路交换数据(Circuit Switch,CS)。其中PS数据描述了用户连接网络的情况,如上网速度、掉线率和移动搜索文本 信息;MR数据可以用来给用户定位,获取用户运动轨迹;CS数据描述的是用户的通话质量,如掉话率等。
我们将获取的原始数据存储到Hadoop分布式文 件系统中(HDFS),然后再利用Hive进行特征生成和处理工作。HDFS可以处理PB级别的超大文件,Hive可 以提供简单的SQL查询功能,并能将SQL语句转化为 MapReduce任务分布式运行。
特征宽表生成后,我们利用Spark的高效计算能 力,在SmartMiner中选取随机森林算法进行流失预测模型的训练,经过训练结果的多次验证和评估,我们将 随机森林设置为200颗树,SQR采样方法,树的最大深 度为15层,叶子最小样本数100个,最大分箱数32,进行模型建立。将分类器训练出来的模型应用到现网数据,可以预测未来3个月有离网倾向的用户,按照离网倾向的高低排名,锁定维系挽留的目标客户。
3.5 模型评估
训练模型的好坏可以通过对历史流失数据的检验来验证,模型评估参数一般包括准确率和覆盖率,准确率越高、覆盖率越大,模型效果越好,其中:准确率=预测流失准确的客户数 / 预测为流失的客户数;覆盖率=预测流失准确的客户数 / 实际流失的客户数。
spark = SparkSession.builder.master("local[2]").appName("Linear-Regression-California-Housing").getOrCreate()
# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed
HOUSING_DATA = './训练集.csv'
sc = spark.sparkContext
sc
sqlContext = SQLContext(spark.sparkContext)
sqlContext
import decimal
from decimal import Decimal
from pyspark.sql.types import *
#from decimal import Decimal
import pymssql
# define the schema, corresponding to a line in the csv data file.
schema = StructType([
StructField("USER_ID", StringType(), nullable=True),
StructField("FLOW", DecimalType(), nullable=True),
StructField("FLOW_LAST_ONE", DecimalType(), nullable=True),
StructField("FLOW_LAST_TWO", DecimalType(), nullable=True),
StructField("MONTH_FEE", DecimalType(), nullable=True),
StructField("MONTHS_3AVG", DecimalType(), nullable=True),
StructField("BINDEXP_DATE", DateType(), nullable=True),
StructField("PHONE_CHANGE", IntegerType(), nullable=True),
StructField("AGE", IntegerType(), nullable=True),
StructField("OPEN_DATE", DateType(), nullable=True),
StructField("REMOVE_TAG", IntegerType(), nullable=True),
]
)
# Load housing data
housing_df=spark.read.csv(path=HOUSING_DATA,schema=schema).cache()
housing_df.show(1)
# run a sample selection
housing_df.select('USER_ID','MONTH_FEE').show(10)
# group by housingmedianage and see the distribution
result_df = housing_df.groupBy("MONTH_FEE").count().sort("MONTH_FEE", ascending=False)
result_df.show(10)
result_df.toPandas().plot.bar(x='MONTH_FEE',figsize=(20, 10))
from pyspark.sql.functions import mean, min, max
result=housing_df.select([min("USER_ID"),max("FLOW"),max("FLOW_LAST_ONE"),max("MONTH_FEE"),max("MONTHS_3AVG")])
result.show()
housing_df = housing_df.select("USER_ID",
"BINDEXP_DATE",
"OPEN_DATE",
"MONTHS_3AVG",
"REMOVE_TAG")
featureCols = ["USER_ID", "BINDEXP_DATE", "OPEN_DATE", "MONTHS_3AVG", "REMOVE_TAG"]
# put features into a feature vector column
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
# Initialize the `standardScaler`
standardScaler=StandardScaler(inputCol="features",outputCol="features_scaled")
assembled_df = assembler.transform(housing_df)
assembled_df.show(10, truncate=False)
In [2]:
import os
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
In [3]:
import seaborn as sns
import matplotlib.pyplot as plt
In [4]:
# Visualization
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)
from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid', rc={'figure.figsize': (18,4)})
rcParams['figure.figsize'] = 18,4
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
In [5]:
# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed
2. Creating the Spark Session
In [6]:
spark = SparkSession.builder.master("local[2]").appName("Linear-Regression-California-Housing").getOrCreate()
In [7]:
AppName
Linear-Regression-California-Housing
In [8]:
sc = spark.sparkContext
sc
In [9]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext
Out[9]:
<pyspark.sql.context.SQLContext at 0x7f0cc4ebb080>
3. Load The Data From a File Into a Dataframe
In [10]:
HOUSING_DATA = '../input/cal_housing.data'
Specifying the schema when loading data into a DataFrame will give better performance than schema inference.
In [11]:
# define the schema, corresponding to a line in the csv data file.
schema = StructType([
StructField("long", FloatType(), nullable=True),
StructField("lat", FloatType(), nullable=True),
StructField("medage", FloatType(), nullable=True),
StructField("totrooms", FloatType(), nullable=True),
StructField("totbdrms", FloatType(), nullable=True),
StructField("pop", FloatType(), nullable=True),
StructField("houshlds", FloatType(), nullable=True),
StructField("medinc", FloatType(), nullable=True),
StructField("medhv", FloatType(), nullable=True)]
)
In [12]:
# Load housing data
housing_df = spark.read.csv(path=HOUSING_DATA, schema=schema).cache()
In [13]:
# Inspect first five rows
housing_df.take(5)
In [14]:
# Show first five rows
housing_df.show(5)
5 r
In [15]:
# show the dataframe columns
housing_df.columns
In [16]:
# show the schema of the dataframe
housing_df.printSchema()
In [17]:
# run a sample selection
housing_df.select('pop','totbdrms').show(10)
4.1 Distribution of the median age of the people living in the area:
In [18]:
# group by housingmedianage and see the distribution
result_df = housing_df.groupBy("medage").count().sort("medage", ascending=False)
In [19]:
result_df.show(10)
+
In [20]:
result_df.toPandas().plot.bar(x='medage',figsize=(14, 6))
axes._subplots.AxesSubplot at 0x7f0cc4ec3198>
In [21]:
(housing_df.describe().select(
"summary",
F.round("medage", 4).alias("medage"),
F.round("totrooms", 4).alias("totrooms"),
F.round("totbdrms", 4).alias("totbdrms"),
F.round("pop", 4).alias("pop"),
F.round("houshlds", 4).alias("houshlds"),
F.round("medinc", 4).alias("medinc"),
F.round("medhv", 4).alias("medhv"))
.show())
+-------+-------+---------+--------+---------+--------+-------+-----------+
# Adjust the values of `medianHouseValue`
housing_df = housing_df.withColumn("medhv", col("medhv")/100000)
In [23]:
# Show the first 2 lines of `df`
housing_df.show(2)
+-------+-----+------+--------+--------+------+--------+------+-----+
------+--------+--------+------+--------+------+-----+
We can clearly see that the values have been adjusted correctly when we look at the result of the show() method:
6. Feature Engineering
Now that we have adjusted the values in medianHouseValue, we will now add the following columns to the data set:
- Rooms per household which refers to the number of rooms in households per block group;
- Population per household, which basically gives us an indication of how many people live in households per block group; And
- Bedrooms per room which will give us an idea about how many rooms are bedrooms per block group;
As we're working with DataFrames, we can best use the select()
method to select the columns that we're going to be working with, namely totalRooms
, households
, and population
. Additionally, we have to indicate that we're working with columns by adding the col()
function to our code. Otherwise, we won't be able to do element-wise operations like the division that we have in mind for these three variables:
In [24]:
housing_df.columns
In [25]:
# Add the new columns to `df`
housing_df = (housing_df.withColumn("rmsperhh", F.round(col("totrooms")/col("houshlds"), 2))
.withColumn("popperhh", F.round(col("pop")/col("houshlds"), 2))
.withColumn("bdrmsperrm", F.round(col("totbdrms")/col("totrooms"), 2)))
In [26]:
# Inspect the result
housing_df.show(5)
+-------+-----+------+--------+--------+------+--------+------+--
We can see that, for the first row, there are about 6.98 rooms per household, the households in the block group consist of about 2.5 people and the amount of bedrooms is quite low with 0.14:
Since we don't want to necessarily standardize our target values, we'll want to make sure to isolate those in our data set. Note also that this is the time to leave out variables that we might not want to consider in our analysis. In this case, let's leave out variables such as longitude, latitude, housingMedianAge and totalRooms.
In this case, we will use the select()
method and passing the column names in the order that is more appropriate. In this case, the target variable medianHouseValue is put first, so that it won't be affected by the standardization.
In [27]:
# Re-order and select columns
housing_df = housing_df.select("medhv",
"totbdrms",
"pop",
"houshlds",
"medinc",
"rmsperhh",
"popperhh",
"bdrmsperrm")
去除建模无用的字段State、Area Code
很明显有一些高度相关的字段【从图上看就是对角线一侧的方格内,数据散点组成了有向线段,斜率为正则正相关,为负即负相关】,比如Total day minutes 和Total day charge。这样的相关数据对我们的模型没有用处,提前去除其中一个字段即可。
另一方面,由于模型的要求,我们把类别型数据都转换成数字。采用的方法是自定义函数分别将 Yes/True and No/False映射成1和0。
binary_map = {'Yes':1.0, 'No':0.0, 'True':1.0, 'False':0.0}
toNum = UserDefinedFunction(lambda k: binary_map[k], DoubleType())
CV_data = CV_data.drop('State').drop('Area code') \
.drop('Total day charge').drop('Total eve charge') \
.drop('Total night charge').drop('Total intl charge') \
.withColumn('Churn', toNum(CV_data['Churn'])) \
.withColumn('International plan', toNum(CV_data['International plan'])) \
.withColumn('Voice mail plan', toNum(CV_data['Voice mail plan'])).cache()
final_test_data = final_test_data.drop('State').drop('Area code') \
.drop('Total day charge').drop('Total eve charge') \
.drop('Total night charge').drop('Total intl charge') \
.withColumn('Churn', toNum(final_test_data['Churn'])) \
.withColumn('International plan', toNum(final_test_data['International plan'])) \
.withColumn('Voice mail plan', toNum(final_test_data['Voice mail plan'])).cache()
pd.DataFrame(CV_data.take(5), columns=CV_data.columns).transpose()
模型训练
MLlib分类器和回归其要求数据集以行标签+特征list的LabeledPoint行形式存储。采用自定义的labelData() 函数展示行处理方式,通过该函数将(CV_data) 转换为符合条件的数据集,然后分为训练集和测试集。为了降低模型的复杂度,我们先利用训练数据生成一个最大深度为2的浅决策树分类模型。
"""训练模型"""
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree
def labelData(data):
# label: row[end], features: row[0:end-1]
return data.map(lambda row: LabeledPoint(row[-1], row[:-1]))
training_data, testing_data = labelData(CV_data).randomSplit([0.8, 0.2])
model = DecisionTree.trainClassifier(training_data, numClasses=2, maxDepth=2,
categoricalFeaturesInfo={1:2, 2:2},
impurity='gini', maxBins=32)
print(model.toDebugString())
print('Feature 12:', CV_data.columns[12])
print('Feature 4: ', CV_data.columns[4])
模型性能评估
测试数据的流失率预测结果包括模型的预测函数和实际标签,我们将使用MLlib’s MulticlassMetrics() 进行模型评估。将预测值和标签以元组形式作为输入,然后输出诸如precision, recall, F1 score and confusion matrix的评估结果。
"""模型性能评估"""
from pyspark.mllib.evaluation import MulticlassMetrics
def getPredictionsLabels(model, test_data):
predictions = model.predict(test_data.map(lambda r: r.features))
return predictions.zip(test_data.map(lambda r: r.label))
def printMetrics(predictions_and_labels):
metrics = MulticlassMetrics(predictions_and_labels)
print('Precision of True ', metrics.precision(1))
print('Precision of False', metrics.precision(0))
print('Recall of True ', metrics.recall(1))
print('Recall of False ', metrics.recall(0))
print('F-1 Score ', metrics.fMeasure())
print('Confusion Matrix\n', metrics.confusionMatrix().toArray())
predictions_and_labels = getPredictionsLabels(model, testing_data)
printMetrics(predictions_and_labels)
整体accuracy, F-1 score结果不错,但是一个问题就是
recall率的偏斜。假流失样本recall(覆盖率)高,但是真流失样本recall(覆盖率)低,商业决策关注的核心是保留最可能流失的客户,而不是那些可能留存的客户。因此我们需要确保模型对真流失样本的准确性。
模型偏差的一个原因是样本数据的偏斜,毕竟流失样本数量远远低于留存客户量,我们看看按照是否流失分组得到的数量分布:
"""数据中是否流失样本数的对比"""
CV_data.groupby('Churn').count().toPandas()
为了确定哪个参数值产生最好的模型,我们需要一个系统的方法量化模型性能并确保结果的可靠性。模型选取通常采用交叉验证的方式进行,常用的方法是k-折交叉验证(k-fold cross validation)。其基本原理是将数据集随机拆分成k部分,将一部分被用作测试数据集,其余的作为训练集,重复k次。通过使用训练集和测试集生成模型,产生k个模型后,以平均性能得分作为整体得分。
为了模型选取,我们遍历模型参数,比较他们的交叉验证性能。性能最佳的模型参数即为所求。
ML package支持k-折交叉验证,可以偶联parameter grid builder和evaluator以建立模型选取工作流。接下来,我们将使用一个transformation/estimation pipeline来训练模型,交叉验证器将使用ParamGridBuilder来遍历设定的决策树maxDepth 参数,并用F1-score作为指标评估性能,重读3次以获得可靠结果。
"""模型选取"""
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def vectorizeData(data):
return data.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).toDF(['label','features'])
vectorized_CV_data = vectorizeData(stratified_CV_data)
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
outputCol='indexedLabel').fit(vectorized_CV_data)
# Automatically identify categorical features and index them
featureIndexer = VectorIndexer(inputCol='features',
outputCol='indexedFeatures',
maxCategories=2).fit(vectorized_CV_data)
# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree])
# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6,7]).build()
# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
predictionCol='prediction', metricName='f1')
# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=3)
CV_model = crossval.fit(vectorized_CV_data)
# Fetch best model
tree_model = CV_model.bestModel.stages[2]
print(tree_model)
我们发现小差验证性能最好的决策树模型深度是5,这意味着起始层数为2的决策树复杂程度不够,而深度超过5的过拟合从而导致测试性能较差。
模型实际性能可以用final_test_data 数据集检验。我们转换数据集后,evaluator会给出预测结果的F-1 score ,然后我们也可以打印出他们的概率。后续也可以用CV_model.transform() 函数预测新的数据。
"""预测以及模型性能评估"""
vectorized_test_data = vectorizeData(final_test_data)
transformed_data = CV_model.transform(vectorized_test_data)
print evaluator.getMetricName(), 'accuracy:', evaluator.evaluate(transformed_data)
predictions = transformed_data.select('indexedLabel', 'prediction', 'probability')
predictions.toPandas().head()
- 导入pyspark相关的包
2.初始化pyspark
- 相关性分析以及数据预处理
- MLib中的决策树模型分析
- 易于理解、可读性强:能直接展示特征选取和样本预测模型的中间过程。
- 数据要求不高:决策树不仅对数据类型【离散型或者连续型】的要求不高,也不要求对数据进行标准化。
- 可以通过剪枝或者限制深度的方式提高预测精度,也能作为弱分类器集成为强分类器(比如随机森林)
- 决策树是预测模型,将观测特征值与类别标签建立映射关系。模型采用自上而下的方式生成,数据源以Gini指数或者信息增益等统计方法为依据,被分割为子数据集数据或者因预定的停止标准而终止分支。
pySpark实现决策树分类,同时采用分层抽样的方法处理数据偏斜问题、利用k折交叉验证的方法进行了模型参数决策树深度的优化,从而获得了最佳决策树。
随机森林是由多个决策树构成的森林,算法分类结果由这些决策树投票得到,决策树在生成过程中分别在行方向和列方向上添加随机过程,行方向上构建决策树 时采用有放回抽样(bootstrapping)得到训练数据,列方向上采用无放回随机抽样得到特征子集,并据此得到其 最优切分点。从图3中可以看到,通过K次训练,得到K棵不同的决策树{T1,T2,…,TK},再将这些树组合成一个分类模型系统,随机森林是一个组合模型,内部仍然是基于决策树,同单一的决策树分类不同的是,随机森林通过多个决策树投票结果进行分类,算法不容易出现过度拟合问题。
为了进一步提前锁定离网倾向用户,经过历史数据的比对,结合用户使用行为的分析,决定将过缴费期10天未缴费的用户定义为流失用户。根据传统数据挖掘实现的离网预测案例的经验,考虑到大数据系统的处理能力,通过对连续3个月内离网的用户进行离网打标,增加离网用户的样本量,提高离网预测的准确率;通过对目标用户中隔月后离网的用户进行打标,预留1个月的 预测结果干预期,进行维系挽留。如图4所示,采用连续7个月的历史数据,对第N-6月的数据进行隔月后的连 续3个月(N-4月、N-3月、N-2月)离网用户打标,取N-6 月、N-5月、N-4月连续3个月的正负样本并集,解决了传统打标负样本量不足和维系干预期太短等问题。
3.3.2 数据特征提取
根据业务经验,选取与用户流失可能存在相关性的所有属性,进行数据审查,筛选存在相关性较大的特 征属性。本次建模数据特征主要采用B域用户通信及消 费行为等基本属性、衍生属性(汇总、比例、趋势和波动)、挖掘属性等,增加O域样本数据,如上网行为、 终端属性指标(换机、应用偏好、掉话率、上网协议响 应成功率等)。如表1所示,数据维度包括基础信息维度、通信行为信息、账务信息、消费行为变化维度、交往圈信息、呼叫异网维度、投诉维度、通信行为维度及上网轨迹、掉话率等。根据这些维度数据合并汇总成数据挖掘特征宽表,用于模型训练和验证。
3.4 建立模型
流失客户预测模型的建立,具体包括原始数据处理、特征宽表构建、模型训练、模型评估和模型调优五个部分。如图5所示,智慧运营平台通过连接全网数 据的接口,获取建模所需的BSS系统(业务支持系统)数 据和OSS系统(运营支持系统)数据。BSS系统是运营商 向用户开展业务的主要IT组成部分,OSS系统是电信服务提供商用来管理通信网络的主要系统。BSS数据包括 CRM(客户关系)、Billing(账单数据)、详单数据及投诉数据,OSS数据包括分组交换数据(Package Switch, PS)、测量报告数据(Measurement Report,MR)和电路交换数据(Circuit Switch,CS)。其中PS数据描述了用户连接网络的情况,如上网速度、掉线率和移动搜索文本 信息;MR数据可以用来给用户定位,获取用户运动轨迹;CS数据描述的是用户的通话质量,如掉话率等。
我们将获取的原始数据存储到Hadoop分布式文 件系统中(HDFS),然后再利用Hive进行特征生成和处理工作。HDFS可以处理PB级别的超大文件,Hive可 以提供简单的SQL查询功能,并能将SQL语句转化为 MapReduce任务分布式运行。
特征宽表生成后,我们利用Spark的高效计算能 力,在SmartMiner中选取随机森林算法进行流失预测模型的训练,经过训练结果的多次验证和评估,我们将 随机森林设置为200颗树,SQR采样方法,树的最大深 度为15层,叶子最小样本数100个,最大分箱数32,进行模型建立。将分类器训练出来的模型应用到现网数据,可以预测未来3个月有离网倾向的用户,按照离网倾向的高低排名,锁定维系挽留的目标客户。
3.5 模型评估
训练模型的好坏可以通过对历史流失数据的检验来验证,模型评估参数一般包括准确率和覆盖率,准确率越高、覆盖率越大,模型效果越好,其中:准确率=预测流失准确的客户数 / 预测为流失的客户数;覆盖率=预测流失准确的客户数 / 实际流失的客户数。
spark = SparkSession.builder.master("local[2]").appName("Linear-Regression-California-Housing").getOrCreate()
# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed
HOUSING_DATA = './训练集.csv'
sc = spark.sparkContext
sc
sqlContext = SQLContext(spark.sparkContext)
sqlContext
import decimal
from decimal import Decimal
from pyspark.sql.types import *
#from decimal import Decimal
import pymssql
# define the schema, corresponding to a line in the csv data file.
schema = StructType([
StructField("USER_ID", StringType(), nullable=True),
StructField("FLOW", DecimalType(), nullable=True),
StructField("FLOW_LAST_ONE", DecimalType(), nullable=True),
StructField("FLOW_LAST_TWO", DecimalType(), nullable=True),
StructField("MONTH_FEE", DecimalType(), nullable=True),
StructField("MONTHS_3AVG", DecimalType(), nullable=True),
StructField("BINDEXP_DATE", DateType(), nullable=True),
StructField("PHONE_CHANGE", IntegerType(), nullable=True),
StructField("AGE", IntegerType(), nullable=True),
StructField("OPEN_DATE", DateType(), nullable=True),
StructField("REMOVE_TAG", IntegerType(), nullable=True),
]
)
# Load housing data
housing_df=spark.read.csv(path=HOUSING_DATA,schema=schema).cache()
housing_df.show(1)
# run a sample selection
housing_df.select('USER_ID','MONTH_FEE').show(10)
# group by housingmedianage and see the distribution
result_df = housing_df.groupBy("MONTH_FEE").count().sort("MONTH_FEE", ascending=False)
result_df.show(10)
result_df.toPandas().plot.bar(x='MONTH_FEE',figsize=(20, 10))
from pyspark.sql.functions import mean, min, max
result=housing_df.select([min("USER_ID"),max("FLOW"),max("FLOW_LAST_ONE"),max("MONTH_FEE"),max("MONTHS_3AVG")])
result.show()
housing_df = housing_df.select("USER_ID",
"BINDEXP_DATE",
"OPEN_DATE",
"MONTHS_3AVG",
"REMOVE_TAG")
featureCols = ["USER_ID", "BINDEXP_DATE", "OPEN_DATE", "MONTHS_3AVG", "REMOVE_TAG"]
# put features into a feature vector column
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
# Initialize the `standardScaler`
standardScaler=StandardScaler(inputCol="features",outputCol="features_scaled")
assembled_df = assembler.transform(housing_df)
assembled_df.show(10, truncate=False)
In [2]:
import os
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
In [3]:
import seaborn as sns
import matplotlib.pyplot as plt
In [4]:
# Visualization
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)
from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid', rc={'figure.figsize': (18,4)})
rcParams['figure.figsize'] = 18,4
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
In [5]:
# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed
2. Creating the Spark Session
In [6]:
spark = SparkSession.builder.master("local[2]").appName("Linear-Regression-California-Housing").getOrCreate()
In [7]:
AppName
Linear-Regression-California-Housing
In [8]:
sc = spark.sparkContext
sc
In [9]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext
Out[9]:
<pyspark.sql.context.SQLContext at 0x7f0cc4ebb080>
3. Load The Data From a File Into a Dataframe
In [10]:
HOUSING_DATA = '../input/cal_housing.data'
Specifying the schema when loading data into a DataFrame will give better performance than schema inference.
In [11]:
# define the schema, corresponding to a line in the csv data file.
schema = StructType([
StructField("long", FloatType(), nullable=True),
StructField("lat", FloatType(), nullable=True),
StructField("medage", FloatType(), nullable=True),
StructField("totrooms", FloatType(), nullable=True),
StructField("totbdrms", FloatType(), nullable=True),
StructField("pop", FloatType(), nullable=True),
StructField("houshlds", FloatType(), nullable=True),
StructField("medinc", FloatType(), nullable=True),
StructField("medhv", FloatType(), nullable=True)]
)
In [12]:
# Load housing data
housing_df = spark.read.csv(path=HOUSING_DATA, schema=schema).cache()
In [13]:
# Inspect first five rows
housing_df.take(5)
In [14]:
# Show first five rows
housing_df.show(5)
5 r
In [15]:
# show the dataframe columns
housing_df.columns
In [16]:
# show the schema of the dataframe
housing_df.printSchema()
In [17]:
# run a sample selection
housing_df.select('pop','totbdrms').show(10)
4.1 Distribution of the median age of the people living in the area:
In [18]:
# group by housingmedianage and see the distribution
result_df = housing_df.groupBy("medage").count().sort("medage", ascending=False)
In [19]:
result_df.show(10)
+
In [20]:
result_df.toPandas().plot.bar(x='medage',figsize=(14, 6))
axes._subplots.AxesSubplot at 0x7f0cc4ec3198>
In [21]:
(housing_df.describe().select(
"summary",
F.round("medage", 4).alias("medage"),
F.round("totrooms", 4).alias("totrooms"),
F.round("totbdrms", 4).alias("totbdrms"),
F.round("pop", 4).alias("pop"),
F.round("houshlds", 4).alias("houshlds"),
F.round("medinc", 4).alias("medinc"),
F.round("medhv", 4).alias("medhv"))
.show())
+-------+-------+---------+--------+---------+--------+-------+-----------+
# Adjust the values of `medianHouseValue`
housing_df = housing_df.withColumn("medhv", col("medhv")/100000)
In [23]:
# Show the first 2 lines of `df`
housing_df.show(2)
+-------+-----+------+--------+--------+------+--------+------+-----+
------+--------+--------+------+--------+------+-----+
We can clearly see that the values have been adjusted correctly when we look at the result of the show() method:
6. Feature Engineering
Now that we have adjusted the values in medianHouseValue, we will now add the following columns to the data set:
- Rooms per household which refers to the number of rooms in households per block group;
- Population per household, which basically gives us an indication of how many people live in households per block group; And
- Bedrooms per room which will give us an idea about how many rooms are bedrooms per block group;
As we're working with DataFrames, we can best use the select()
method to select the columns that we're going to be working with, namely totalRooms
, households
, and population
. Additionally, we have to indicate that we're working with columns by adding the col()
function to our code. Otherwise, we won't be able to do element-wise operations like the division that we have in mind for these three variables:
In [24]:
housing_df.columns
In [25]:
# Add the new columns to `df`
housing_df = (housing_df.withColumn("rmsperhh", F.round(col("totrooms")/col("houshlds"), 2))
.withColumn("popperhh", F.round(col("pop")/col("houshlds"), 2))
.withColumn("bdrmsperrm", F.round(col("totbdrms")/col("totrooms"), 2)))
In [26]:
# Inspect the result
housing_df.show(5)
+-------+-----+------+--------+--------+------+--------+------+--
We can see that, for the first row, there are about 6.98 rooms per household, the households in the block group consist of about 2.5 people and the amount of bedrooms is quite low with 0.14:
Since we don't want to necessarily standardize our target values, we'll want to make sure to isolate those in our data set. Note also that this is the time to leave out variables that we might not want to consider in our analysis. In this case, let's leave out variables such as longitude, latitude, housingMedianAge and totalRooms.
In this case, we will use the select()
method and passing the column names in the order that is more appropriate. In this case, the target variable medianHouseValue is put first, so that it won't be affected by the standardization.
In [27]:
# Re-order and select columns
housing_df = housing_df.select("medhv",
"totbdrms",
"pop",
"houshlds",
"medinc",
"rmsperhh",
"popperhh",
"bdrmsperrm")
去除建模无用的字段State、Area Code
很明显有一些高度相关的字段【从图上看就是对角线一侧的方格内,数据散点组成了有向线段,斜率为正则正相关,为负即负相关】,比如Total day minutes 和Total day charge。这样的相关数据对我们的模型没有用处,提前去除其中一个字段即可。
另一方面,由于模型的要求,我们把类别型数据都转换成数字。采用的方法是自定义函数分别将 Yes/True and No/False映射成1和0。
binary_map = {'Yes':1.0, 'No':0.0, 'True':1.0, 'False':0.0}
toNum = UserDefinedFunction(lambda k: binary_map[k], DoubleType())
CV_data = CV_data.drop('State').drop('Area code') \
.drop('Total day charge').drop('Total eve charge') \
.drop('Total night charge').drop('Total intl charge') \
.withColumn('Churn', toNum(CV_data['Churn'])) \
.withColumn('International plan', toNum(CV_data['International plan'])) \
.withColumn('Voice mail plan', toNum(CV_data['Voice mail plan'])).cache()
final_test_data = final_test_data.drop('State').drop('Area code') \
.drop('Total day charge').drop('Total eve charge') \
.drop('Total night charge').drop('Total intl charge') \
.withColumn('Churn', toNum(final_test_data['Churn'])) \
.withColumn('International plan', toNum(final_test_data['International plan'])) \
.withColumn('Voice mail plan', toNum(final_test_data['Voice mail plan'])).cache()
pd.DataFrame(CV_data.take(5), columns=CV_data.columns).transpose()
模型训练
MLlib分类器和回归其要求数据集以行标签+特征list的LabeledPoint行形式存储。采用自定义的labelData() 函数展示行处理方式,通过该函数将(CV_data) 转换为符合条件的数据集,然后分为训练集和测试集。为了降低模型的复杂度,我们先利用训练数据生成一个最大深度为2的浅决策树分类模型。
"""训练模型"""
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree
def labelData(data):
# label: row[end], features: row[0:end-1]
return data.map(lambda row: LabeledPoint(row[-1], row[:-1]))
training_data, testing_data = labelData(CV_data).randomSplit([0.8, 0.2])
model = DecisionTree.trainClassifier(training_data, numClasses=2, maxDepth=2,
categoricalFeaturesInfo={1:2, 2:2},
impurity='gini', maxBins=32)
print(model.toDebugString())
print('Feature 12:', CV_data.columns[12])
print('Feature 4: ', CV_data.columns[4])
模型性能评估
测试数据的流失率预测结果包括模型的预测函数和实际标签,我们将使用MLlib’s MulticlassMetrics() 进行模型评估。将预测值和标签以元组形式作为输入,然后输出诸如precision, recall, F1 score and confusion matrix的评估结果。
"""模型性能评估"""
from pyspark.mllib.evaluation import MulticlassMetrics
def getPredictionsLabels(model, test_data):
predictions = model.predict(test_data.map(lambda r: r.features))
return predictions.zip(test_data.map(lambda r: r.label))
def printMetrics(predictions_and_labels):
metrics = MulticlassMetrics(predictions_and_labels)
print('Precision of True ', metrics.precision(1))
print('Precision of False', metrics.precision(0))
print('Recall of True ', metrics.recall(1))
print('Recall of False ', metrics.recall(0))
print('F-1 Score ', metrics.fMeasure())
print('Confusion Matrix\n', metrics.confusionMatrix().toArray())
predictions_and_labels = getPredictionsLabels(model, testing_data)
printMetrics(predictions_and_labels)
整体accuracy, F-1 score结果不错,但是一个问题就是
recall率的偏斜。假流失样本recall(覆盖率)高,但是真流失样本recall(覆盖率)低,商业决策关注的核心是保留最可能流失的客户,而不是那些可能留存的客户。因此我们需要确保模型对真流失样本的准确性。
模型偏差的一个原因是样本数据的偏斜,毕竟流失样本数量远远低于留存客户量,我们看看按照是否流失分组得到的数量分布:
"""数据中是否流失样本数的对比"""
CV_data.groupby('Churn').count().toPandas()
为了确定哪个参数值产生最好的模型,我们需要一个系统的方法量化模型性能并确保结果的可靠性。模型选取通常采用交叉验证的方式进行,常用的方法是k-折交叉验证(k-fold cross validation)。其基本原理是将数据集随机拆分成k部分,将一部分被用作测试数据集,其余的作为训练集,重复k次。通过使用训练集和测试集生成模型,产生k个模型后,以平均性能得分作为整体得分。
为了模型选取,我们遍历模型参数,比较他们的交叉验证性能。性能最佳的模型参数即为所求。
ML package支持k-折交叉验证,可以偶联parameter grid builder和evaluator以建立模型选取工作流。接下来,我们将使用一个transformation/estimation pipeline来训练模型,交叉验证器将使用ParamGridBuilder来遍历设定的决策树maxDepth 参数,并用F1-score作为指标评估性能,重读3次以获得可靠结果。
"""模型选取"""
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def vectorizeData(data):
return data.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).toDF(['label','features'])
vectorized_CV_data = vectorizeData(stratified_CV_data)
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
outputCol='indexedLabel').fit(vectorized_CV_data)
# Automatically identify categorical features and index them
featureIndexer = VectorIndexer(inputCol='features',
outputCol='indexedFeatures',
maxCategories=2).fit(vectorized_CV_data)
# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree])
# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6,7]).build()
# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
predictionCol='prediction', metricName='f1')
# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=3)
CV_model = crossval.fit(vectorized_CV_data)
# Fetch best model
tree_model = CV_model.bestModel.stages[2]
print(tree_model)
我们发现小差验证性能最好的决策树模型深度是5,这意味着起始层数为2的决策树复杂程度不够,而深度超过5的过拟合从而导致测试性能较差。
模型实际性能可以用final_test_data 数据集检验。我们转换数据集后,evaluator会给出预测结果的F-1 score ,然后我们也可以打印出他们的概率。后续也可以用CV_model.transform() 函数预测新的数据。
"""预测以及模型性能评估"""
vectorized_test_data = vectorizeData(final_test_data)
transformed_data = CV_model.transform(vectorized_test_data)
print evaluator.getMetricName(), 'accuracy:', evaluator.evaluate(transformed_data)
predictions = transformed_data.select('indexedLabel', 'prediction', 'probability')
predictions.toPandas().head()
- 点赞
- 收藏
- 关注作者
评论(0)