PySpark特征工程(I)--数据预处理

news2024/12/27 1:42:52

有这么一句话在业界广泛流传:数据和特征决定了机器学习的上限,而模型和算法只是逼近这个上限而已。由此可见,特征工程在机器学习中占有相当重要的地位。在实际应用当中,可以说特征工程是机器学习成功的关键。

特征工程是数据分析中最耗时间和精力的一部分工作,它不像算法和模型那样是确定的步骤,更多是工程上的经验和权衡。因此没有统一的方法。这里只是对一些常用的方法做一个总结。

特征工程包含了 Data PreProcessing(数据预处理)、Feature Extraction(特征提取)、Feature Selection(特征选择)和 Feature construction(特征构造)等子问题。

数据预处理

数据预处理是特征工程的最重要的起始步骤,需要把特征预处理成机器学习模型所能接受的形式。

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml import Estimator, Transformer
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
import pyspark.sql.functions as fn
import pyspark.ml.feature as ft
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.sql import Observation
from pyspark.sql import Window
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from xgboost.spark import SparkXGBClassifier
import xgboost as xgb

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import time
import warnings

# Setting configuration.
warnings.filterwarnings('ignore')
SEED = 42

# Use 0.11.4-spark3.3 version for Spark3.3 and 1.0.2 version for Spark3.4
spark = SparkSession.builder \
            .master("local[*]") \
            .appName("XGBoost with PySpark") \
            .config("spark.driver.memory", "10g") \
            .config("spark.driver.cores", "2") \
            .config("spark.executor.memory", "10g") \
            .config("spark.executor.cores", "2") \
            .enableHiveSupport() \
            .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')
24/06/01 11:20:13 WARN Utils: Your hostname, MacBook-Air resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
24/06/01 11:20:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/01 11:20:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

探索性数据分析

本项目使用 Kaggle 上的 家庭信用违约风险数据集 (Home Credit Default Risk) ,是一个标准的机器学习分类问题。其目标是使用历史贷款的信息,以及客户的社会经济和财务信息,预测客户是否会违约。

本篇主要通过 application 文件,做基本的数据分析和建模,也是本篇的主要内容。

df = spark.sql("select * from home_credit_default_risk.application_train")
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
df.limit(5).toPandas()                                                                 
SK_ID_CURRTARGETNAME_CONTRACT_TYPECODE_GENDERFLAG_OWN_CARFLAG_OWN_REALTYCNT_CHILDRENAMT_INCOME_TOTALAMT_CREDITAMT_ANNUITY...FLAG_DOCUMENT_18FLAG_DOCUMENT_19FLAG_DOCUMENT_20FLAG_DOCUMENT_21AMT_REQ_CREDIT_BUREAU_HOURAMT_REQ_CREDIT_BUREAU_DAYAMT_REQ_CREDIT_BUREAU_WEEKAMT_REQ_CREDIT_BUREAU_MONAMT_REQ_CREDIT_BUREAU_QRTAMT_REQ_CREDIT_BUREAU_YEAR
01914800Cash loansMYN0157500.0342000.017590.5...00000.00.00.01.00.07.0
11915020Cash loansFNY0108000.0324000.020704.5...00000.00.00.00.00.00.0
21916730Cash loansFYY0135000.01323000.036513.0...00000.00.00.01.00.02.0
31918770Cash loansFNY245000.047970.05296.5...00000.00.00.00.00.04.0
41921080Cash loansFNY0315000.0263686.513522.5...00000.00.00.05.02.03.0

5 rows × 122 columns

print(f"dataset shape: ({df.count()}, {len(df.columns)})")
dataset shape: (307511, 122)
# df.printSchema()

在遇到非常多的数据的时候,我们一般先会按照数据的类型分布下手,看看不同的数据类型各有多少

# Number of each type of column
dtypes = dict(df.dtypes)
pd.Series(dtypes).value_counts()
double    65
int       41
string    16
Name: count, dtype: int64

接下来看下数据集的统计信息

df.summary().toPandas()
summarySK_ID_CURRTARGETNAME_CONTRACT_TYPECODE_GENDERFLAG_OWN_CARFLAG_OWN_REALTYCNT_CHILDRENAMT_INCOME_TOTALAMT_CREDIT...FLAG_DOCUMENT_18FLAG_DOCUMENT_19FLAG_DOCUMENT_20FLAG_DOCUMENT_21AMT_REQ_CREDIT_BUREAU_HOURAMT_REQ_CREDIT_BUREAU_DAYAMT_REQ_CREDIT_BUREAU_WEEKAMT_REQ_CREDIT_BUREAU_MONAMT_REQ_CREDIT_BUREAU_QRTAMT_REQ_CREDIT_BUREAU_YEAR
0count307511307511307511307511307511307511307511307511307511...307511307511307511307511265992265992265992265992265992265992
1mean278180.518576571250.08072881945686496NoneNoneNoneNone0.4170517477423572168797.91929698447599025.9997057016...0.0081297904790397745.951006630657115E-45.072989258920819E-43.349473677364387E-40.0064024481939306450.00700021053264759850.03436193569731420.267395260007819770.265474149598484141.899974435321363
2stddev102790.175348424610.2724186456483938NoneNoneNoneNone0.722121384437625237123.14627885612402490.776995855...0.08979823610939560.0243874650658622640.0225176202684461320.018298531822437640.083849128447477260.110757406324354590.204684875812824430.91600239615261710.79405564832075751.8692949981815559
3min1000020Cash loansFNN025650.045000.0...00000.00.00.00.00.00.0
425%1891240NoneNoneNoneNone0112500.0270000.0...00000.00.00.00.00.00.0
550%2781730NoneNoneNoneNone0146250.0513531.0...00000.00.00.00.00.01.0
675%3671180NoneNoneNoneNone1202500.0808650.0...00000.00.00.00.00.03.0
7max4562551Revolving loansXNAYY191.17E84050000.0...11114.09.08.027.0261.025.0

8 rows × 123 columns

查看目标变量分布

# `TARGET` is the target variable we are trying to predict (0 or 1):
# 1 = Not Repaid 
# 0 = Repaid

# Check if the data is unbalanced
row = df.select(fn.mean('TARGET').alias('rate')).first()
print(f"percentage of default : {row['rate']:.2%}")
df.groupBy("TARGET").count().show() 
percentage of default : 8.07%                                

+------+------+
|TARGET| count|
+------+------+
|     1| 24825|
|     0|282686|
+------+------+

数据清洗

数据清洗 (Data cleaning) 是对数据进行重新审查和校验的过程,目的在于删除重复信息、纠正存在的错误,并提供数据一致性。

数据去重

首先,根据某个 / 多个特征值构成的样本 ID 去重

# `SK_ID_CURR` is the unique id of the row.
df.dropDuplicates(subset=["SK_ID_CURR"]).count() == df.count()
True

数据类型转换

dtypes = df.drop("SK_ID_CURR", "TARGET").dtypes

categorical_cols = [k for k, v in dtypes if v == 'string']
numerical_cols = [k for k, v in dtypes if v != 'string']

有时,有些数值型特征标识的只是不同类别,其数值的大小并没有实际意义,因此我们将其转化为类别特征。
本项目并无此类特征,以 hours_appr_process_start 为示例:

# df = df.withColumn('HOUR_APPR_PROCESS_START', df['HOUR_APPR_PROCESS_START'].astype(str))

错误数据清洗

接下来,我们根据业务常识,或者使用但不限于箱型图(Box-plot)发现数据中不合理的特征值进行清洗。
数据探索时,我们注意到,DAYS_BIRTH列(年龄)中的数字是负数,由于它们是相对于当前贷款申请计算的,所以我们将其转化成正数后查看分布

df.select(df['DAYS_BIRTH'] / -365).summary().show()
+-------+-------------------+
|summary|(DAYS_BIRTH / -365)|
+-------+-------------------+
|  count|             307511|
|   mean|  43.93697278587162|
| stddev| 11.956133237768654|
|    min| 20.517808219178082|
|    25%|  34.00547945205479|
|    50%|  43.14794520547945|
|    75%| 53.917808219178085|
|    max|  69.12054794520547|
+-------+-------------------+

那些年龄看起来合理,没有异常值。
接下来,我们对其他的 DAYS 特征作同样的分析

for feature in ['DAYS_BIRTH', 'DAYS_EMPLOYED', 'DAYS_REGISTRATION', 'DAYS_ID_PUBLISH']:
        print(f'{feature} info: ')
        df.select(df[feature] / -365).summary().show()                    
DAYS_BIRTH info:   
+-------+-------------------+
|summary|(DAYS_BIRTH / -365)|
+-------+-------------------+
|  count|             307511|
|   mean|  43.93697278587162|
| stddev| 11.956133237768654|
|    min| 20.517808219178082|
|    25%|  34.00547945205479|
|    50%|  43.14794520547945|
|    75%| 53.917808219178085|
|    max|  69.12054794520547|
+-------+-------------------+

DAYS_EMPLOYED info: 
+-------+----------------------+
|summary|(DAYS_EMPLOYED / -365)|
+-------+----------------------+
|  count|                307511|
|   mean|   -174.83574220287002|
| stddev|    387.05689457185537|
|    min|   -1000.6657534246575|
|    25%|    0.7917808219178082|
|    50%|    3.3232876712328765|
|    75%|     7.558904109589041|
|    max|     49.07397260273972|
+-------+----------------------+

DAYS_REGISTRATION info: 
+-------+--------------------------+
|summary|(DAYS_REGISTRATION / -365)|
+-------+--------------------------+
|  count|                    307511|
|   mean|        13.660603637091562|
| stddev|         9.651743345104306|
|    min|                      -0.0|
|    25%|         5.504109589041096|
|    50%|        12.336986301369864|
|    75%|        20.487671232876714|
|    max|         67.59452054794521|
+-------+--------------------------+

DAYS_ID_PUBLISH info: 
+-------+------------------------+
|summary|(DAYS_ID_PUBLISH / -365)|
+-------+------------------------+
|  count|                  307511|
|   mean|        8.20329417328335|
| stddev|       4.135480600008283|
|    min|                    -0.0|
|    25%|      4.7095890410958905|
|    50%|       8.915068493150685|
|    75%|      11.775342465753425|
|    max|       19.71780821917808|
+-------+------------------------+                                                    
buckets = df.select((df['DAYS_EMPLOYED'] / -365).alias('DAYS_EMPLOYED'))

bucketizer = ft.QuantileDiscretizer(numBuckets=10, inputCol='DAYS_EMPLOYED', outputCol='buckets').fit(buckets)
buckets = bucketizer.transform(buckets)

buckets.groupBy('buckets').count().sort('buckets').show()
bucketizer.getSplits()
+-------+-----+
|buckets|count|
+-------+-----+
|    1.0|61425|
|    2.0|30699|
|    3.0|30733|
|    4.0|30685|
|    5.0|30741|
|    6.0|30716|
|    7.0|30750|
|    8.0|30726|
|    9.0|31036|
+-------+-----+


[-inf,
 -1000.6657534246575,
 0.39452054794520547,
 1.252054794520548,
 2.2465753424657535,
 3.317808219178082,
 4.635616438356164,
 6.457534246575342,
 8.827397260273973,
 13.2986301369863,
 inf]

有超过60000个用户的DAYS_EMPLOYED在1000年上,可以猜测这只是缺失值标记。

# Replace the anomalous values with nan
df_emp = df.select(fn.when(df['DAYS_EMPLOYED']>=365243, None).otherwise(df['DAYS_EMPLOYED']).alias('DAYS_EMPLOYED'))

df_emp.sample(0.1).toPandas().plot.hist(title = 'Days Employment Histogram')
plt.xlabel('Days Employment')

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

可以看到,数据分布基本正常了。

布尔特征清洗

for col in categorical_cols:
    unique_count = df.select(col).dropna().distinct().count()
    if unique_count == 2:
        df.groupBy(col).count().show()                          
+------------------+------+
|NAME_CONTRACT_TYPE| count|
+------------------+------+
|   Revolving loans| 29279|
|        Cash loans|278232|
+------------------+------+
+------------+------+
|FLAG_OWN_CAR| count|
+------------+------+
|           Y|104587|
|           N|202924|
+------------+------+
+---------------+------+
|FLAG_OWN_REALTY| count|
+---------------+------+
|              Y|213312|
|              N| 94199|
+---------------+------+
+-------------------+------+
|EMERGENCYSTATE_MODE| count|
+-------------------+------+
|               NULL|145755|
|                 No|159428|
|                Yes|  2328|
+-------------------+------+                                                                              
cols_to_transform = ['FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'EMERGENCYSTATE_MODE']
df.replace(
    ['Y', 'N', 'Yes', 'No'], ['1', '0', '1', '0'], 
    subset=cols_to_transform
).select(cols_to_transform).show(5)
+------------+---------------+-------------------+
|FLAG_OWN_CAR|FLAG_OWN_REALTY|EMERGENCYSTATE_MODE|
+------------+---------------+-------------------+
|           1|              0|                  0|
|           0|              1|                  0|
|           1|              1|               NULL|
|           0|              1|               NULL|
|           0|              1|                  0|
+------------+---------------+-------------------+
only showing top 5 rows

函数封装

最后,使用函数封装以上步骤:

dtypes = df.drop("SK_ID_CURR", "TARGET").dtypes
categorical_cols = [k for k, v in dtypes if v == 'string']
numerical_cols = [k for k, v in dtypes if v != 'string']

# Data cleaning
def clean(df):
    # remove duplicates.
    df = df.dropDuplicates(subset=["SK_ID_CURR"])
    
    # transform
    cols_to_transform = ['FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'EMERGENCYSTATE_MODE']
    df = df.replace(
        ['Y', 'N', 'Yes', 'No'], ['1', '0', '1', '0'], 
        subset=cols_to_transform
    )
    df = df.withColumns({c: df[c].cast('int') for c in cols_to_transform})
    
    # Replace the anomalous values with nan
    df = df.withColumn('DAYS_EMPLOYED', 
        fn.when(df['DAYS_EMPLOYED']>=365243, None).otherwise(df['DAYS_EMPLOYED'])
    )
    
    df = df.replace('XNA', None)
    df = df.withColumnRenamed("TARGET", "label")
    return df

df = clean(df)

特征重编码

有很多机器学习算法只能接受数值型特征的输入,不能处理离散值特征,比如线性回归,逻辑回归等线性模型,那么我们需要将离散特征重编码成数值变量。

现在我们来看看每个分类特征的类别数:

df.select([fn.countDistinct(col).alias(col) for col in categorical_cols]).show(1, vertical=True)
-RECORD 0-------------------------
 NAME_CONTRACT_TYPE         | 2   
 CODE_GENDER                | 2   
 FLAG_OWN_CAR               | 2   
 FLAG_OWN_REALTY            | 2   
 NAME_TYPE_SUITE            | 7   
 NAME_INCOME_TYPE           | 8   
 NAME_EDUCATION_TYPE        | 5   
 NAME_FAMILY_STATUS         | 6   
 NAME_HOUSING_TYPE          | 6   
 OCCUPATION_TYPE            | 18  
 WEEKDAY_APPR_PROCESS_START | 7   
 ORGANIZATION_TYPE          | 57  
 FONDKAPREMONT_MODE         | 4   
 HOUSETYPE_MODE             | 3   
 WALLSMATERIAL_MODE         | 7   
 EMERGENCYSTATE_MODE        | 2                         
  1. 变量 NAME_EDUCATION_TYPE 表征着潜在的排序关系,可以使用顺序编码。
  2. 变量 OCCUPATION_TYPE (职业类型)和 ORGANIZATION_TYPE 类别数较多,准备使用平均数编码。
  3. 剩余的无序分类特征使用one-hot编码。

顺序编码

有序分类特征实际上表征着潜在的排序关系,我们将这些特征的类别映射成有大小的数字,因此可以用顺序编码。

让我们从分类特征中手动提取有序级别:

# The ordinal (ordered) categorical features
# Pandas calls the categories "levels"

ordered_levels = {
    "NAME_EDUCATION_TYPE": ["Lower secondary", 
                            "Secondary / secondary special", 
                            "Incomplete higher", 
                            "Higher education"]
}

spark中的StringIndexer是按特征值出现的频率编码,我们需要自定义一个编码函数。

def ordinal_encode(df, levels):
    for var, to_replace in levels.items():
        mapping = {v: str(i) for i,v in enumerate(to_replace)}
        df = df.replace(mapping, subset=[var])
        df = df.withColumn(var, df[var].cast('int'))
    print(f'{len(levels):d} columns were ordinal encoded')
    return df
ordinal_encode(df, ordered_levels).groupBy(*ordered_levels.keys()).count().show()
1 columns were ordinal encoded
+-------------------+------+
|NAME_EDUCATION_TYPE| count|
+-------------------+------+
|               NULL|   164|
|                  1|218391|
|                  3| 74863|
|                  2| 10277|
|                  0|  3816|
+-------------------+------+                                                        

平均数编码

一般情况下,针对分类特征,我们只需要OneHotEncoder或OrdinalEncoder进行编码,这类简单的预处理能够满足大多数数据挖掘算法的需求。如果某一个分类特征的可能值非常多(高基数 high cardinality),那么再使用one-hot编码往往会出现维度爆炸。平均数编码(mean encoding)是一种高效的编码方式,在实际应用中,能极大提升模型的性能。

变量 OCCUPATION_TYPE (职业类型)和 ORGANIZATION_TYPE类别数较多,准备使用平均数编码。

class MeanEncoder(Estimator, Transformer):
    def __init__(self, smoothing=0.0, inputCols=None, labelCol="label"):
        """
        The MeanEncoder() replaces categories by the mean value of the target for each
        category.
        
        math:
            mapping = (w_i) posterior + (1-w_i) prior
        where
            w_i = n_i t / (s + n_i t)
        
        In the previous equation, t is the target variance in the entire dataset, s is the
        target variance within the category and n is the number of observations for the
        category.
        
        Parameters
        ----------
        smoothing: int, float, 'auto', default=0.0
        """
        super().__init__()
        self.smoothing = smoothing
        self.inputCols = inputCols
        self.labelCol = labelCol
    
    def _fit(self, df):
        """
        Learn the mean value of the target for each category of the variable.
        """

        self.encoder_dict = {}
        inputCols = self.inputCols
        labelCol = self.labelCol
        y_prior = df.select(fn.mean(labelCol).alias("mean")).first()["mean"]
        
        for var in inputCols:
            if self.smoothing == "auto":
                y_var = df.cov(labelCol, labelCol)
                damping = fn.variance(labelCol) / y_var
            else:
                damping = fn.lit(self.smoothing)
            
            groups = df.groupBy(var).agg(
                fn.mean(labelCol).alias("posterior"),
                fn.count("*").alias("counts"),
                damping.alias("damping") 
            ).toPandas().dropna()
            
            groups["lambda"] = groups["counts"] / (groups["counts"] + groups["damping"])
            groups["code"] = (
                groups["lambda"] * groups["posterior"] + 
                    (1.0 - groups["lambda"]) * y_prior
            )
            
            self.encoder_dict[var] = dict(zip(groups[var], groups["code"]))
        return self
    
    def _transform(self, df):
        for var in self.encoder_dict:
            mapping = {k: str(v) for k,v in self.encoder_dict[var].items()}
            df = df.replace(mapping, subset=[var])
            df = df.withColumn(var, df[var].cast('float'))

        print(f'{len(self.encoder_dict):d} columns were mean encoded')
        return df
# replace categories by the mean value of the target for each category.
inputCols = ['OCCUPATION_TYPE', 'ORGANIZATION_TYPE']
mean_encoder = MeanEncoder(
    inputCols=inputCols, 
    labelCol='label',
    smoothing='auto'
)
mean_encoder.fit(df).transform(df).select(inputCols).show(5)  
2 columns were mean encoded
+---------------+-----------------+
|OCCUPATION_TYPE|ORGANIZATION_TYPE|
+---------------+-----------------+
|    0.062140968|       0.09299603|
|     0.09631742|       0.09449421|
|    0.113258936|       0.10173836|
|           NULL|             NULL|
|           NULL|             NULL|
+---------------+-----------------+
only showing top 5 rows

哑变量编码

无序分类特征对于树集成模型(tree-ensemble like XGBoost)是可用的,但对于线性模型(like Lasso or Ridge)则必须使用one-hot重编码。接下来我们把上节索引化的无序分类特征进行编码。

# The nominative (unordered) categorical features
encoded_cols = ['NAME_EDUCATION_TYPE', 'OCCUPATION_TYPE', 'ORGANIZATION_TYPE']
nominal_categories = [col for col in categorical_cols if col not in encoded_cols]

indexedCols = [f"indexed_{col}" for col in nominal_categories]
vectorCols = [f"encoded_{col}" for col in nominal_categories]

onehot_encoder = Pipeline(stages=[
    StringIndexer(
        inputCols=nominal_categories, 
        outputCols=indexedCols,
        handleInvalid='keep'
    ),
    OneHotEncoder(
        inputCols=indexedCols,
        outputCols=vectorCols
    )
])
onehot_encoder.fit(df).transform(df).select(vectorCols).limit(5).toPandas()
encoded_NAME_CONTRACT_TYPEencoded_CODE_GENDERencoded_FLAG_OWN_CARencoded_FLAG_OWN_REALTYencoded_NAME_TYPE_SUITEencoded_NAME_INCOME_TYPEencoded_NAME_FAMILY_STATUSencoded_NAME_HOUSING_TYPEencoded_WEEKDAY_APPR_PROCESS_STARTencoded_FONDKAPREMONT_MODEencoded_HOUSETYPE_MODEencoded_WALLSMATERIAL_MODEencoded_EMERGENCYSTATE_MODE
0(0.0, 1.0)(0.0, 1.0)(0.0, 1.0)(1.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 1.0)(1.0, 0.0, 0.0)(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0)
1(1.0, 0.0)(1.0, 0.0)(1.0, 0.0)(1.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 1.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0)(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0)
2(1.0, 0.0)(0.0, 1.0)(0.0, 1.0)(1.0, 0.0)(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 1.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0)
3(1.0, 0.0)(1.0, 0.0)(1.0, 0.0)(0.0, 1.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0)
4(1.0, 0.0)(1.0, 0.0)(1.0, 0.0)(1.0, 0.0)(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 1.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0)

连续特征分箱

Binning Continuous Features

在实际的模型训练过程中,我们也经常对连续特征进行离散化处理,这样能消除特征量纲的影响,同时还能极大减少异常值的影响,增加特征的稳定性。

分箱主要分为等频分箱、等宽分箱和聚类分箱三种。等频分箱会一定程度受到异常值的影响,而等宽分箱又容易完全忽略异常值信息,从而一定程度上导致信息损失,若要更好的兼顾变量的原始分布,则可以考虑聚类分箱。所谓聚类分箱,指的是先对某连续变量进行聚类(往往是 k-Means 聚类),然后使用样本所属类别。

以年龄对还款的影响为例

# Find the correlation of the positive days since birth and target
df.select((df['DAYS_BIRTH'] / -365).alias('age'), 'label').corr('age', "label")
-0.07823930830982699

可见,客户年龄与目标意义呈负相关关系,即随着客户年龄的增长,他们往往会更经常地按时偿还贷款。我们接下来将制作一个核心密度估计图(KDE),直观地观察年龄对目标的影响。

sample = df.sample(0.1).select((df['DAYS_BIRTH']/fn.lit(-365)).alias("age"), "label").toPandas()

plt.figure(figsize = (5, 3))
sns.kdeplot(data=sample, x="age", hue="label", common_norm=False)
plt.xlabel('Age (years)')
plt.ylabel('Density')
plt.title('Distribution of Ages')

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

如果我们把年龄分箱:

# Bin the age data
age_binned = pd.cut(sample['age'], bins = np.linspace(20, 70, num = 11))
age_groups  = sample['label'].groupby(age_binned).mean()

plt.figure(figsize = (8, 3))
# Graph the age bins and the average of the target as a bar plot
sns.barplot(x=age_groups.index, y=age_groups*100)
# Plot labeling
plt.xticks(rotation = 30)
plt.xlabel('Age Group (years)')
plt.ylabel('Failure to Repay (%)')
plt.title('Failure to Repay by Age Group')

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

有一个明显的趋势:年轻的申请人更有可能不偿还贷款! 年龄最小的三个年龄组的失败率在10%以上,最老的年龄组为5%。
pyspark.ml.feature 模块中的 Bucketizer 可以实现等宽分箱,QuantileDiscretizer可以实现等频分箱。

bucketizer = ft.QuantileDiscretizer(
    numBuckets=10,
    handleInvalid='keep',
    inputCols=['DAYS_BIRTH', 'DAYS_EMPLOYED'], 
    outputCols=["buckets1", "buckets2"]
).fit(df)

splits = bucketizer.getSplitsArray() # bin_edges
for c, s in zip(['DAYS_BIRTH', 'DAYS_EMPLOYED'], splits):
    print(f"{c}'s bin_edges:")
    print(s)
DAYS_BIRTH's bin_edges:
[-inf, -22185.0, -20480.0, -18892.0, -17228.0, -15759.0, -14425.0, -13153.0, -11706.0, -10296.0, inf]
DAYS_EMPLOYED's bin_edges:
[-inf, -5338.0, -3679.0, -2795.0, -2164.0, -1650.0, -1253.0, -922.0, -619.0, -336.0, inf] 

函数封装

dtypes = df.drop("SK_ID_CURR", "TARGET").dtypes
categorical_cols = [k for k, v in dtypes if v == 'string']
numerical_cols = [k for k, v in dtypes if v != 'string']   

def encode(df):
    # The ordinal (ordered) categorical features
    # Pandas calls the categories "levels"
    ordered_levels = {
        "NAME_EDUCATION_TYPE": ["Lower secondary", 
                                "Secondary / secondary special", 
                                "Incomplete higher", 
                                "Higher education"]
    }
    df = ordinal_encode(df, ordered_levels)
    
    # replace categories by the mean value of the target for each category.
    mean_encoder = MeanEncoder(
        inputCols=['OCCUPATION_TYPE', 'ORGANIZATION_TYPE'], 
        labelCol='label',
        smoothing='auto'
    )
    df = mean_encoder.fit(df).transform(df)
    
    # The nominative (unordered) categorical features
    nominal_categories = [col for col in categorical_cols if col not in ordered_levels]
    features_onehot = [col for col in nominal_categories if col not in ['OCCUPATION_TYPE', 'ORGANIZATION_TYPE']]

    indexedCols = [f"indexed_{col}" for col in features_onehot]
    encodedCols = [f"encoded_{col}" for col in features_onehot]

    onehot_encoder = Pipeline(stages=[
        StringIndexer(
            inputCols=features_onehot, 
            outputCols=indexedCols,
            handleInvalid='keep'
        ),
        OneHotEncoder(
            inputCols=indexedCols,
            outputCols=encodedCols
        )
    ])
    
    df = onehot_encoder.fit(df).transform(df).drop(*features_onehot + indexedCols)
    print(f'{len(features_onehot):d} columns were one-hot encoded')
    
    colsMap = dict(zip(encodedCols, features_onehot))
    df = df.withColumnsRenamed(colsMap)
    return df
# Encode categorical features
df_encoded = encode(df)
df_encoded.select(categorical_cols).limit(5).toPandas()
1 columns were ordinal encoded
2 columns were mean encoded
10 columns were one-hot encoded
NAME_CONTRACT_TYPECODE_GENDERNAME_TYPE_SUITENAME_INCOME_TYPENAME_EDUCATION_TYPENAME_FAMILY_STATUSNAME_HOUSING_TYPEOCCUPATION_TYPEWEEKDAY_APPR_PROCESS_STARTORGANIZATION_TYPEFONDKAPREMONT_MODEHOUSETYPE_MODEWALLSMATERIAL_MODE
0(0.0, 1.0)(0.0, 1.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)3(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)0.062141(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)0.092996(0.0, 0.0, 0.0, 1.0)(1.0, 0.0, 0.0)(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)
1(1.0, 0.0)(1.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)2(0.0, 0.0, 0.0, 1.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)0.096317(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)0.094494(1.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0)(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)
2(1.0, 0.0)(0.0, 1.0)(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)1(0.0, 0.0, 1.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)0.113259(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)0.101738(0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
3(1.0, 0.0)(1.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)1(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)NaN(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)NaN(0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
4(1.0, 0.0)(1.0, 0.0)(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)1(0.0, 0.0, 0.0, 1.0, 0.0, 0.0)(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)NaN(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)NaN(0.0, 0.0, 0.0, 0.0)(0.0, 0.0, 0.0)(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
pd.Series(dict(df_encoded.dtypes)).value_counts()
double    65
int       45
vector    10
float      2
Name: count, dtype: int64

缺失值处理

特征有缺失值是非常常见的,大部分机器学习模型在拟合前需要处理缺失值(Handle Missing Values)。

缺失值统计

# Function to calculate missing values by column
def display_missing(df, threshold=None, verbose=1):
    n = df.count()
    exprs = [fn.sum(df[col].isNull().cast('int')).alias(col) for col in df.columns]
    missing_number = df.select(*exprs).first().asDict()
    missing_df = pd.DataFrame({
        "missing_number": missing_number.values(),  # Total missing values
        "missing_rate": [value / n for value in missing_number.values()]   # Proportion of missing values
        }, index=missing_number.keys())
    missing_df = missing_df.query("missing_rate>0").sort_values("missing_rate", ascending=False)
    threshold = 0.25 if threshold is None else threshold
    high_missing = missing_df.query(f"missing_rate>{threshold}")
    # Print some summary information
    if verbose:
        print(f"Your selected dataframe has {missing_df.shape[0]} out of {len(df.columns)} columns that have missing values.")
    # Return the dataframe with missing information
    if threshold is None:
        return missing_df
    else:
        if verbose:
            print(f"There are {high_missing.shape[0]} columns with more than {threshold:.1%} missing values.")
        return high_missing
# Missing values statistics
print(display_missing(df_encoded).head(10))
Your selected dataframe has 66 out of 122 columns that have missing values.
There are 47 columns with more than 25.0% missing values.
                          missing_number  missing_rate
COMMONAREA_MEDI                   214865      0.698723
COMMONAREA_MODE                   214865      0.698723
COMMONAREA_AVG                    214865      0.698723
NONLIVINGAPARTMENTS_MODE          213514      0.694330
NONLIVINGAPARTMENTS_MEDI          213514      0.694330
NONLIVINGAPARTMENTS_AVG           213514      0.694330
LIVINGAPARTMENTS_MODE             210199      0.683550
LIVINGAPARTMENTS_MEDI             210199      0.683550
LIVINGAPARTMENTS_AVG              210199      0.683550
FLOORSMIN_MODE                    208642      0.678486                                       

缺失值删除

如果某个特征的缺失值超过阈值(例如80%),那么该特征对模型的贡献就会降低,通常就可以考虑删除该特征。

# Remove variables with high missing rate

def drop_missing_data(df, threshold=0.8):
    # Remove variables with missing more than threshold(default 20%)
    thresh = int(df.count() * (1 - threshold))
    exprs = [fn.sum(df[col].isNull().cast('int')).alias(col) for col in df.columns]
    missing_number = df.select(*exprs).first().asDict()
    cols_to_drop = [k for k,v in missing_number.items() if v > thresh]
    print(f"Removed {len(cols_to_drop)} variables with missing more than {1 - threshold:.1%}")
    return df.drop(*cols_to_drop)
_ = drop_missing_data(df_encoded, threshold=0.2)
Removed 0 variables with missing more than 80.0%

缺失值标记

有时,对于每个含有缺失值的列,我们额外添加一列来表示该列中缺失值的位置,在某些应用中,能取得不错的效果。
继续分析之前清洗过的 DAYS_EMPLOYED 异常,我们对缺失数据进行标记,看看他们是否影响客户违约。

df_encoded.groupBy(df_encoded['DAYS_EMPLOYED'].isNull()).mean('label').show()
+-----------------------+-------------------+
|(DAYS_EMPLOYED IS NULL)|         avg(label)|
+-----------------------+-------------------+
|                   true|0.05399646043269404|
|                  false| 0.0865997453765215|
+-----------------------+-------------------+

发现缺失值的逾期率 5.4% 低于正常值的逾期率 8.66%,与Target的相关性很强,因此新增一列DAYS_EMPLOYED_MISSING 标记。这种处理对线性方法比较有效,而基于树的方法可以自动识别。

# Adds a binary variable to flag missing observations.
from pyspark.ml.stat import Correlation, ChiSquareTest

def flag_missing(df, inputCols=None, labelCol='label', alpha=0.05):
    """
    Adds a binary variable to flag missing observations(one indicator per variable). 
    The added variables (missing indicators) are named with the original variable name plus '_missing'.
    
    Parameters:
    ----------
    alpha: float, default=0.05
        Features with correlation more than alpha are selected.
    """
    if inputCols is None:
        inputCols = df.drop(labelCol).columns
    
    for var in inputCols:
        df = df.withColumn(var + "_missing", df[var].isNull().cast('int'))
    
    indicators = [var + "_missing" for var in inputCols]
    # The correlations
    corr = df.select([fn.corr(labelCol, c2).alias(c2) for c2 in indicators])
    corr = corr.fillna(0).first().asDict()
    # find variables for which indicator should be added.
    selected_cols = [var for var, r in corr.items() if abs(r) > alpha]
    drop_cols = [var for var in indicators if var not in selected_cols]
    df = df.drop(*drop_cols)
    print(f"Added {len(selected_cols)} missing indicators")
    return df
print('The number of features:', len(flag_missing(df_encoded).columns))
Added 0 missing indicators
The number of features: 122

人工插补

根据业务知识来进行人工填充。

若变量是类别型,且不同值较少,可在编码时转换成哑变量。例如,编码前的性别变量 code_gender

pipeline = Pipeline(stages=[
    StringIndexer(
        inputCol="CODE_GENDER", 
        outputCol="indexedCol",
        handleInvalid="keep"
    ),
    OneHotEncoder(
        inputCol="indexedCol", 
        outputCol="encodedCol", 
        handleInvalid="keep",
        dropLast=False
    )
])

pipeline.fit(df).transform(df).select("CODE_GENDER", "encodedCol").show(5)                            
+-----------+-------------+
|CODE_GENDER|   encodedCol|
+-----------+-------------+
|          M|(4,[1],[1.0])|
|          F|(4,[0],[1.0])|
|          M|(4,[1],[1.0])|
|          F|(4,[0],[1.0])|
|          F|(4,[0],[1.0])|
+-----------+-------------+
only showing top 5 rows

分类特征在索引化时已经处理了缺失值,因此不需要再特殊处理。
若变量是布尔型,视情况可统一填充为零

nunique = df_encoded.select([fn.countDistinct(var).alias(var) for var in df_encoded.columns]).first().asDict() 
binary = df_encoded.select([fn.collect_set(var).alias(var) for var,n in nunique.items() if n == 2])
print([k for k, v in binary.first().asDict().items() if set(v) == {0, 1}])
['label', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'FLAG_MOBIL', 'FLAG_EMP_PHONE', 'FLAG_WORK_PHONE', 'FLAG_CONT_MOBILE', 'FLAG_PHONE', 'FLAG_EMAIL', 'REG_REGION_NOT_LIVE_REGION', 'REG_REGION_NOT_WORK_REGION', 'LIVE_REGION_NOT_WORK_REGION', 'REG_CITY_NOT_LIVE_CITY', 'REG_CITY_NOT_WORK_CITY', 'LIVE_CITY_NOT_WORK_CITY', 'EMERGENCYSTATE_MODE', 'FLAG_DOCUMENT_2', 'FLAG_DOCUMENT_3', 'FLAG_DOCUMENT_4', 'FLAG_DOCUMENT_5', 'FLAG_DOCUMENT_6', 'FLAG_DOCUMENT_7', 'FLAG_DOCUMENT_8', 'FLAG_DOCUMENT_9', 'FLAG_DOCUMENT_10', 'FLAG_DOCUMENT_11', 'FLAG_DOCUMENT_12', 'FLAG_DOCUMENT_13', 'FLAG_DOCUMENT_14', 'FLAG_DOCUMENT_15', 'FLAG_DOCUMENT_16', 'FLAG_DOCUMENT_17', 'FLAG_DOCUMENT_18', 'FLAG_DOCUMENT_19', 'FLAG_DOCUMENT_20', 'FLAG_DOCUMENT_21']

如果我们仔细观察一下字段描述,会发现很多缺失值都有迹可循,比如客户的社会关系中有30天/60天逾期及申请贷款前1小时/天/周/月/季度/年查询了多少次征信的都可填充为数字0。

def impute_manually(df):
    """
    Replaces missing values by an arbitrary value
    """
    # boolean
    boolean_features = ['FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'FLAG_MOBIL', 'FLAG_EMP_PHONE', 
                        'FLAG_WORK_PHONE', 'FLAG_CONT_MOBILE', 'FLAG_PHONE', 'FLAG_EMAIL', 
                        'REG_REGION_NOT_LIVE_REGION', 'REG_REGION_NOT_WORK_REGION', 'LIVE_REGION_NOT_WORK_REGION',
                        'REG_CITY_NOT_LIVE_CITY', 'REG_CITY_NOT_WORK_CITY', 'LIVE_CITY_NOT_WORK_CITY', 
                        'EMERGENCYSTATE_MODE', 'FLAG_DOCUMENT_2', 'FLAG_DOCUMENT_3', 'FLAG_DOCUMENT_4', 
                        'FLAG_DOCUMENT_5', 'FLAG_DOCUMENT_6', 'FLAG_DOCUMENT_7', 'FLAG_DOCUMENT_8', 'FLAG_DOCUMENT_9', 
                        'FLAG_DOCUMENT_10', 'FLAG_DOCUMENT_11', 'FLAG_DOCUMENT_12', 'FLAG_DOCUMENT_13', 
                        'FLAG_DOCUMENT_14', 'FLAG_DOCUMENT_15', 'FLAG_DOCUMENT_16', 'FLAG_DOCUMENT_17', 
                        'FLAG_DOCUMENT_18', 'FLAG_DOCUMENT_19', 'FLAG_DOCUMENT_20', 'FLAG_DOCUMENT_21']
    df = df.na.fill(0, subset=boolean_features)
    # fill 0
    features_fill_zero = [
        "OBS_30_CNT_SOCIAL_CIRCLE",  
        "DEF_30_CNT_SOCIAL_CIRCLE",
        "OBS_60_CNT_SOCIAL_CIRCLE",
        "DEF_60_CNT_SOCIAL_CIRCLE",
        "AMT_REQ_CREDIT_BUREAU_HOUR",
        "AMT_REQ_CREDIT_BUREAU_DAY",
        "AMT_REQ_CREDIT_BUREAU_WEEK",
        "AMT_REQ_CREDIT_BUREAU_MON",
        "AMT_REQ_CREDIT_BUREAU_QRT",
        "AMT_REQ_CREDIT_BUREAU_YEAR"
    ]
    df = df.na.fill(0, subset=features_fill_zero)
    
    return df
_ = display_missing(impute_manually(df_encoded))
Your selected dataframe has 55 out of 122 columns that have missing values.
There are 46 columns with more than 25.0% missing values.

条件平均值填充法

通过之前的相关分析,我们知道AMT_ANNUITY这个特征与AMT_CREDIT和AMT_INCOME_TOTAL有比较大的关系,所以这里用这两个特征分组后的中位数进行插补,称为条件平均值填充法(Conditional Mean Completer)。

print('AMT_CREDIT :', df.corr('AMT_CREDIT', 'AMT_ANNUITY'))
print('AMT_INCOME_TOTAL :', df.corr('AMT_CREDIT', 'AMT_ANNUITY'))  
AMT_CREDIT : 0.7700800319525184
AMT_INCOME_TOTAL : 0.7700800319525184
# conditional statistic completer
class ConditionalMeanCompleter:
    pass

简单插补

Imputer 支持平均值、中位数或众数插补缺失值,目前不支持分类特征。

# Univariate imputer for completing missing values with simple strategies.

dtypes = df_encoded.drop("SK_ID_CURR", "TARGET").dtypes
numerical_cols = [k for k, v in dtypes if v not in ('string', 'vector')]
imputed_cols = [f"imputed_{col}" for col in numerical_cols]
imputer = ft.Imputer(
    inputCols=numerical_cols,
    outputCols=imputed_cols,
    strategy="median"
)
_ = display_missing(imputer.fit(df_encoded).transform(df_encoded).select(imputed_cols))
Your selected dataframe has 0 out of 111 columns that have missing values.
There are 0 columns with more than 25.0% missing values.

函数封装

最后,总结下我们的缺失处理策略:

  • 删除缺失率高于80%特征
  • 添加缺失标记
  • 有业务含义的进行人工插补
  • 最后简单统计插补
# Function for missing value imputation

def handle_missing(df):
    # Remove variables with high missing rate
    df = drop_missing_data(df, threshold=0.2)
    # find variables for which indicator should be added.
    df = flag_missing(df)

    # Replaces missing values by an arbitrary value
    df = impute_manually(df)

    # Univariate imputer for completing missing values with simple strategies.
    dtypes = df.drop("SK_ID_CURR", "TARGET").dtypes
    numerical_cols = [k for k, v in dtypes if v not in ('string', 'vector')]
    imputed_cols = [f"imputed_{col}" for col in numerical_cols]
    imputer = ft.Imputer(
        inputCols=numerical_cols,
        outputCols=imputed_cols,
        strategy="median"
    )
    df = imputer.fit(df).transform(df)
    colsMap = dict(zip(imputed_cols, numerical_cols))
    df = df.drop(*numerical_cols).withColumnsRenamed(colsMap)
    return df
df_imputed = handle_missing(df_encoded)
Removed 0 variables with missing more than 80.0%
Added 0 missing indicators                  

确认缺失值是否已全部处理完毕:

_ = display_missing(df_imputed)
Your selected dataframe has 0 out of 122 columns that have missing values.
There are 0 columns with more than 25.0% missing values.

异常值检测

我们在实际项目中拿到的数据往往有不少异常数据,这些异常数据很可能让我们模型有很大的偏差。异常检测的方法有很多,例如3倍标准差、箱线法的单变量标记,或者聚类、iForest和LocalOutlierFactor等无监督学习方法。

  • 箱线图检测根据四分位点判断是否异常。四分位数具有鲁棒性,不受异常值的干扰。通常认为小于 Q 1 − 1.5 ∗ I Q R Q_1-1.5*IQR Q11.5IQR 或大于 Q 3 + 1.5 ∗ I Q R Q_3+1.5*IQR Q3+1.5IQR 的点为离群点。
  • 3倍标准差原则:假设数据满足正态分布,通常定义偏离均值的 3 σ 3\sigma 3σ 之外内的点为离群点, P ( ∣ X − μ ∣ < 3 σ ) = 99.73 % \mathbb P(|X-\mu|<3\sigma)=99.73\% P(Xμ<3σ)=99.73%​。如果数据不服从正态分布,也可以用远离平均值的多少倍标准差来描述。

筛选出来的异常样本需要根据实际含义处理:

  • 根据异常点的数量和影响,考虑是否将该条记录删除。
  • 对数据做 log-scale 变换后消除异常值。
  • 通过数据分箱来平滑异常值。
  • 使用均值/中位数/众数来修正替代异常点,简单高效。
  • 标记异常值或新增异常值得分列。
  • 树模型对离群点的鲁棒性较高,可以选择忽略异常值。
class OutlierCapper(Estimator, Transformer):
    """
    Caps maximum and/or minimum values of a variable at automatically
    determined values.
    Works only with numerical variables. A list of variables can be indicated. 
    
    Parameters
    ----------
    method: str, 'gaussian' or 'iqr', default='iqr'
        If method='gaussian': 
            - upper limit: mean + 3 * std
            - lower limit: mean - 3 * std
        If method='iqr': 
            - upper limit: 75th quantile + 3 * IQR
            - lower limit: 25th quantile - 3 * IQR
            where IQR is the inter-quartile range: 75th quantile - 25th quantile.
    fold: int, default=3   
        You can select how far out to cap the maximum or minimum values.
    """

    def __init__(self, inputCols, method='iqr', fold=3):
        super().__init__()
        self.method = method
        self.fold = fold
        self.inputCols = inputCols

    def _fit(self, df):
        """
        Learn the values that should be used to replace outliers.
        """

        if self.method == "gaussian":
            mean = df.select([fn.mean(var).alias(var) for var in self.inputCols])
            mean = pd.Series(mean.first().asDict())
            bias= [mean, mean]
            scale = df.select([fn.std(var).alias(var) for var in self.inputCols])
            scale = pd.Series(scale.first().asDict())
        elif self.method == "iqr":
            Q1 = df.select([fn.percentile(var, 0.25).alias(var) for var in self.inputCols])
            Q1 = pd.Series(Q1.first().asDict())
            Q3 = df.select([fn.percentile(var, 0.75).alias(var) for var in self.inputCols])
            Q3 = pd.Series(Q3.first().asDict())
            bias = [Q1, Q3]
            scale = Q3 - Q1         
        
        # estimate the end values
        if (scale == 0).any():
            raise ValueError(
                f"Input columns {scale[scale == 0].index.tolist()!r}"
                f" have low variation for method {self.method!r}."
                f" Try other capping methods or drop these columns."
            )
        else:
            self.upper_limit = bias[1] + self.fold * scale
            self.lower_limit = bias[0] - self.fold * scale  

        return self 

    def _transform(self, df):
        """
        Cap the variable values.
        """
        maximum = df.select([fn.max(var).alias(var) for var in self.inputCols])
        maximum = pd.Series(maximum.first().asDict())
        minimum = df.select([fn.min(var).alias(var) for var in self.inputCols])
        minimum = pd.Series(minimum.first().asDict())
        outiers = (maximum.gt(self.upper_limit) | 
                   minimum.lt(self.lower_limit))
        n = outiers.sum()
        print(f"Your selected dataframe has {n} out of {len(self.inputCols)} columns that have outliers.")
        
        # replace outliers
        for var in self.inputCols:
            upper_limit = self.upper_limit[var]
            lower_limit = self.lower_limit[var]
            df = df.withColumn(var, 
                fn.when(df[var] > upper_limit, upper_limit)
                  .when(df[var] < lower_limit, lower_limit)
                  .otherwise(df[var])
            )
        return df
outlier_capper = OutlierCapper(method="gaussian", inputCols=numerical_cols).fit(df_imputed)
df_capped = outlier_capper.transform(df_imputed)
Your selected dataframe has 96 out of 111 columns that have outliers.

标准化/归一化

数据标准化和归一化可以提高一些算法的准确度,也能加速梯度下降收敛速度。也有不少模型不需要做标准化和归一化,主要是基于概率分布的模型,比如决策树大家族的CART,随机森林等。

  • z-score标准化是最常见的特征预处理方式,基本所有的线性模型在拟合的时候都会做标准化。前提是假设特征服从正态分布,标准化后,其转换成均值为0标准差为1的标准正态分布。
  • max-min标准化也称为离差标准化,预处理后使特征值映射到[0,1]之间。这种方法的问题就是如果测试集或者预测数据里的特征有小于min,或者大于max的数据,会导致max和min发生变化,需要重新计算。所以实际算法中, 除非你对特征的取值区间有需求,否则max-min标准化没有 z-score标准化好用。
  • L1/L2范数标准化:如果我们只是为了统一量纲,那么通过L2范数整体标准化。
pyspark.ml.feature标准化
StandardScaler(withMean, withStd, …)是一个Estimator。z-scoe标准化
Normalizer(p, inputCol, outputCol)是一个Transformer。该方法使用p范数将数据缩放为单位范数(默认为L2)
MaxAbsScaler(inputCol, outputCol)是一个Estimator。将数据标准化到[-1, 1]范围内
MinMaxScaler(min, max, inputCol, outputCol)是一个Estimator。将数据标准化到[0, 1]范围内
RobustScaler(lower, upper, …)是一个Estimator。根据分位数缩放数据

由于数据集中依然存在一定的离群点,我们可以用RobustScaler对数据进行标准化处理。

from pyspark.ml.feature import RobustScaler

scaler = RobustScaler(inputCol="features", outputCol="scaled")
assembler = VectorAssembler(
    inputCols=['DAYS_EMPLOYED', 'AMT_CREDIT'],
    outputCol="features"
)
pipelineModel = Pipeline(stages=[assembler, scaler]).fit(df_imputed)
pipelineModel.transform(df_imputed).select('scaled').show(5)
+--------------------+
|              scaled|
+--------------------+
|[-0.9644030668127...|
|[-0.5991237677984...|
|[-0.6056955093099...|
|[-0.9036144578313...|
|[-0.9036144578313...|
+--------------------+
only showing top 5 rows

正态变换

偏度

在许多回归算法中,尤其是线性模型,常常假设数值型特征服从正态分布。我们先来计算一下各个数值特征的偏度:

# Check the skew of all numerical features
skewness = df_imputed.select([fn.skewness(var).alias(var) for var in numerical_cols])
skewness = pd.Series(skewness.first().asDict()).sort_values()
print(skewness.head(10))
print(skewness.tail(10))
FLAG_MOBIL                     -554.534039
FLAG_CONT_MOBILE                -23.081060
YEARS_BEGINEXPLUATATION_MEDI    -21.825280
YEARS_BEGINEXPLUATATION_AVG     -21.744660
YEARS_BEGINEXPLUATATION_MODE    -20.686068
DAYS_EMPLOYED                    -2.295700
YEARS_BUILD_MODE                 -1.889130
YEARS_BUILD_MEDI                 -1.747004
YEARS_BUILD_AVG                  -1.744856
FLAG_EMP_PHONE                   -1.664878
dtype: float64
FLAG_DOCUMENT_20              44.364680
FLAG_DOCUMENT_21              54.612673
FLAG_DOCUMENT_17              61.213842
FLAG_DOCUMENT_7               72.173756
FLAG_DOCUMENT_4              110.893823
AMT_REQ_CREDIT_BUREAU_QRT    141.400225
FLAG_DOCUMENT_2              153.791067
FLAG_DOCUMENT_10             209.588031
AMT_INCOME_TOTAL             391.557744
FLAG_DOCUMENT_12             392.112866
dtype: float64

可以看到这些特征的偏度较高,因此我们尝试变换,让数据接近正态分布。

QQ图

以AMT_CREDIT特征为例,我们画出分布图和QQ图。

Quantile-Quantile图是一种常用的统计图形,用来比较两个数据集之间的分布。它是由标准正态分布的分位数为横坐标,样本值为纵坐标的散点图。如果QQ图上的点在一条直线附近,则说明数据近似于正态分布,且该直线的斜率为标准差,截距为均值。

import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import probplot, norm

def norm_comparison_plot(series):
    series = pd.Series(series)
    mu, sigma = norm.fit(series)
    kurt, skew = series.kurt(), series.skew()
    print(f"Kurtosis: {kurt:.2f}", f"Skewness: {skew:.2f}", sep='\t')
    
    fig = plt.figure(figsize=(10, 4))
    # Now plot the distribution
    ax1 = fig.add_subplot(121)
    ax1.set_title('Distribution')
    ax1.set_ylabel('Frequency')
    sns.distplot(series, fit=norm, ax=ax1)
    ax1.legend(['dist','kde','norm'],f'Normal dist. ($\mu=$ {mu:.2f} and $\sigma=$ {sigma:.2f} )', loc='best')
    # Get also the QQ-plot
    ax2 = fig.add_subplot(122)
    probplot(series, plot=plt)
sample = df_imputed.select('AMT_CREDIT').sample(0.1).toPandas()
norm_comparison_plot(sample['AMT_CREDIT'])
plt.show()                                                                                 
Kurtosis: 2.06	Skewness: 1.26

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

非线性变换

最常用的是log变换。对于含有负数的特征,可以先min-max缩放到[0,1]之间后再做变换。

这里我们对AMT_INCOME_TOTAL特征做log变换

# log-transformation of skewed features.
sample_transformed = df_imputed.select(fn.ln('AMT_CREDIT')).sample(0.1).toPandas()
norm_comparison_plot(sample_transformed.iloc[:, 0])
plt.show()        
Kurtosis: -0.27	Skewness: -0.33

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

可以看到经过log变换后,基本符合正态分布了。

Baseline

至此,数据预处理已经基本完毕

df_prepared = df_imputed
print(f'dataset shape: {df_prepared.count(), len(df_prepared.columns)}')
print(pd.Series(dict(df_prepared.dtypes)).value_counts())
dataset shape: (307511, 122)
double    65
int       45
vector    10
float      2
Name: count, dtype: int64

规范特征名

new_colnames = {c: c.replace('/','or').replace(' ','_').replace(',','_or') for c in df_prepared.columns}
df_prepared = df_prepared.withColumnsRenamed(new_colnames)

交叉验证

我们可以选择模型开始训练了。我们准备选择XGBoost模型训练结果作为baseline。

spark内置的交叉验证CrossValidator主要用于超参数调优,我们重新定义一个交叉验证函数。

def cross_val_score(df, estimator, evaluator, features, numFolds=3, seed=SEED):
    df = df.withColumn('fold', (fn.rand(seed) * numFolds).cast('int'))
    eval_result = []
    # Initialize an empty dataframe to hold feature importances
    feature_importances = pd.DataFrame(index=features)
    for i in range(numFolds):
        train = df.filter(df['fold'] == i)
        valid = df.filter(df['fold'] != i)
        model = estimator.fit(train)
        train_pred = model.transform(train)
        valid_pred = model.transform(valid)
        train_score = evaluator.evaluate(train_pred)
        valid_score = evaluator.evaluate(valid_pred)
        metric = evaluator.getMetricName()
        print(f"[{i}] train's {metric}: {train_score},  valid's {metric}: {valid_score}")
        eval_result.append(valid_score)
        
        fscore = model.get_feature_importances()
        fscore = {name:fscore.get(f'f{k}', 0) for k,name in enumerate(features)}
        feature_importances[f'cv_{i}'] = fscore
    feature_importances['fscore'] = feature_importances.mean(axis=1)
    return eval_result, feature_importances.sort_values('fscore', ascending=False)
def score_dataset(df, inputCols=None, featuresCol=None, labelCol='label', nfold=3):
    assert inputCols is not None or featuresCol is not None
    if featuresCol is None:
        # Assemble the feature columns into a single vector column
        featuresCol = "features"
        assembler = VectorAssembler(
            inputCols=inputCols,
            outputCol=featuresCol
        )
        df = assembler.transform(df)
    # Create an Estimator.
    classifier = SparkXGBClassifier(
        features_col=featuresCol, 
        label_col=labelCol,
        eval_metric='auc',
        scale_pos_weight=11,
        learning_rate=0.015,
        max_depth=8,
        subsample=1.0,
        colsample_bytree=0.35,
        reg_alpha=65,
        reg_lambda=15,
        n_estimators=1200,
        verbosity=0
    ) 
    evaluator = BinaryClassificationEvaluator(labelCol=labelCol, metricName='areaUnderROC')
    # Training with 3-fold CV:
    scores, feature_importances = cross_val_score(
        df=df,
        estimator=classifier, 
        evaluator=evaluator,
        features=inputCols,
        numFolds=nfold
    )
    print(f"cv_agg's valid auc: {np.mean(scores):.4f} +/- {np.std(scores):.5f}")
    return feature_importances
features = df_prepared.drop('SK_ID_CURR', 'label').columns
feature_importances = score_dataset(df_prepared, inputCols=features)
[0] train's areaUnderROC: 0.8817445932752518,  valid's areaUnderROC: 0.7567778599507636
[1] train's areaUnderROC: 0.8858137153416724,  valid's areaUnderROC: 0.754088602137405
[2] train's areaUnderROC: 0.8830645318540977,  valid's areaUnderROC: 0.755218312522418
cv_agg's valid auc: 0.7554 +/- 0.00110
df_prepared.write.bucketBy(100, "SK_ID_CURR").mode("overwrite").saveAsTable("home_credit_default_risk.prepared_data")   

特征重要性

feature_importances['fscore'].head(15)
NONLIVINGAPARTMENTS_MEDI        4420.333333
NONLIVINGAREA_MEDI              4300.666667
YEARS_BEGINEXPLUATATION_MODE    4240.000000
COMMONAREA_MODE                 4098.666667
ELEVATORS_MODE                  4023.666667
NONLIVINGAPARTMENTS_AVG         3947.000000
LIVINGAREA_AVG                  3862.666667
YEARS_BUILD_MODE                3781.000000
NONLIVINGAREA_AVG               3455.333333
LIVINGAREA_MEDI                 3313.666667
BASEMENTAREA_MODE               3160.666667
LIVINGAPARTMENTS_AVG            2819.333333
LIVINGAPARTMENTS_MEDI           2635.000000
YEARS_BUILD_MEDI                2312.666667
ENTRANCES_MODE                  1947.666667
Name: fscore, dtype: float64
spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1819283.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

交流负载箱的使用场景和应用范围是什么?

交流负载箱模拟实际用电设备运行状态的电力测试设备&#xff0c;主要用于对各种电气设备、电源系统、发电机组等进行性能测试、质量检验和安全评估。交流负载箱的使用场景和应用范围非常广泛&#xff0c;涵盖了电力、通信、能源、交通等多个领域。 1. 电力行业&#xff1a;在电…

什么是电脑监控软件?六款知名又实用的电脑监控软件

电脑监控软件是一种专为监控和记录计算机活动而设计的应用程序&#xff0c;它能够帮助用户&#xff08;如家长、雇主或系统管理员&#xff09;了解并管理目标计算机的使用情况。这些软件通常具有多样化的功能&#xff0c;包括但不限于屏幕捕捉、网络行为监控、应用程序使用记录…

北斗导航:让科技引领未来出行

北斗导航是中国自主研发的卫星导航系统&#xff0c;由一系列北斗卫星和地面控制平台组成。它的研发始于上世纪80年代&#xff0c;经过几十年的发展&#xff0c;如今已成为全球领先的卫星导航系统之一。北斗导航凭借其优秀的性能&#xff0c;为我们的出行提供了准确、可靠的定位…

Spring Boot + EasyExcel + SqlServer 进行批量处理数据

前言 在日常开发和工作中&#xff0c;我们可能要根据用户上传的文件做一系列的处理&#xff0c;本篇文章就以Excel表格文件为例&#xff0c;模拟用户上传Excel文件&#xff0c;讲述后端如何高效的进行数据的处理。 一.引入 EasyExcel 依赖 <!-- https://mvnrepository.com/…

数据预处理——调整方差、标准化、归一化(Matlab、python)

对数据的预处理&#xff1a; (a)、调整数据的方差&#xff1b; (b)、标准化&#xff1a;将数据标准化为具有零均值和单位方差&#xff1b;&#xff08;均值方差归一化(Standardization)&#xff09; (c)、最值归一化&#xff0c;也称为离差标准化&#xff0c;是对原始数据的…

Elasticsearch 第二期:倒排索引,分析,映射

前言 正像前面所说&#xff0c;ES真正强大之处在于可以从无规律的数据中找出有意义的信息——从“大数据”到“大信息”。这也是Elasticsearch一开始就将自己定位为搜索引擎&#xff0c;而不是数据存储的一个原因。因此用这一篇文字记录ES搜索的过程。 关于ES搜索计划分两篇或…

刷题笔记2:用位运算找“只出现一次的一个数”

1. & 和 | 的基本操作 137. 只出现一次的数字 II - 力扣&#xff08;LeetCode&#xff09; 先对位运算的操作进行复习&#xff1a; 1、>> 右移操作符 移位规则&#xff1a;⾸先右移运算分两种&#xff1a; 1. 逻辑右移&#xff1a;左边⽤0填充&#xff0c;右边丢…

Stable diffusion 3 正式开源

6月12日晚&#xff0c;著名开源大模型平台Stability AI正式开源了&#xff0c;文生图片模型Stable Diffusion 3 Medium&#xff08;以下简称“SD3-M”&#xff09;权重。 SD3-M有20亿参数&#xff0c;平均生成图片时间在2—10秒左右推理效率非常高&#xff0c;同时对硬件的需求…

Instagram涨粉六大秘籍:从零开始建立粉丝基础

Instagram已经从一个简单的社交分享平台演变成一个强大的营销工具&#xff0c;品牌和内容创作者利用它来吸引潜在客户并推广个人品牌。 随着全球超过13.5亿用户每日在Instagram寻找新的视觉灵感和最新潮流&#xff0c;如何在这个竞争激烈的环境中脱颖而出&#xff0c;有效增粉…

msvcp110.dll有什么解决方案,msvcp110.dll几种方法详细步骤教程

本文旨在探讨如何应对电脑出现 vcruntime140_1.dll 无法继续执行代码错误提示的问题。同时&#xff0c;将阐释该文件的作用&#xff0c;列举常见的错误问题&#xff0c;并提供一些在修复 vcruntime140_1.dll 时的注意事项&#xff0c;以避免在解决过程中引发其他问题。接下来&a…

界面控件DevExpress WinForms垂直属性网格组件 - 拥有更灵活的UI选择(一)

DevExpress WinForms垂直&属性网格组件旨在提供UI灵活性&#xff0c;它允许用户显示数据集中的单个行或在其90度倒置网格容器中显示多行数据集。另外&#xff0c;用户可以把它用作一个属性网格&#xff0c;就像在Visual Studio IDE中那样。 P.S&#xff1a;DevExpress Win…

meilisearch的索引(index)的最佳实践

官网的第一手资料学新技术&#xff1a;meilisearch官方文档 安装的官网地址&#xff1a;meilisearch安装的官网 部署在生产环境的指导&#xff1a;meilisearch部署在生产环境的指导 Elasticsearch 做为老牌搜索引擎&#xff0c;功能基本满足&#xff0c;但复杂&#xff0c;重…

Deep Freeze冰点还原8.57最新版软件安装包下载+详细安装步骤

​冰点还原精灵&#xff08;DeepFreeze&#xff09;是由Faronics公司出品的一款系统还原软件&#xff0c;能保留您的计算机配置&#xff0c;确保全面的端点保护&#xff0c;任何更改&#xff0c;无论是恶意更改还是无意更改&#xff0c;都会在重启时撤销&#xff0c;这就是“重…

想设计完美Banner?这7个步骤教你快速上手!

一个合格的网页横幅设计体现在吸引用户点击&#xff0c;促进用户的购物欲望上。网页横幅设计可能是一个漫长而复杂的过程&#xff0c;涉及到每个职位。团队工作时&#xff0c;横幅设计的沟通过程越长&#xff0c;越容易忘记某些步骤&#xff0c;或者因为时间限制而忽略某些部分…

YOLO系列理论解读 v1 v2 v3

YOLO系列理论解读 YOLO v1&#xff08;You Only Look Once:Unified, Real-Time Object Detection&#xff09; YOLO v1实现步骤 将一幅图像分成SxS个网格(grid cell)&#xff0c;如果某个object的中心落在这个网格中&#xff0c;则这个网格就负责预测这个object。 通常情况…

如何理解 Java 的垃圾回收机制及其工作原理

Java的垃圾回收机制&#xff08;Garbage Collection&#xff0c;GC&#xff09;是Java虚拟机&#xff08;JVM&#xff09;内存管理的重要组成部分&#xff0c;其主要目的是自动管理内存&#xff0c;释放不再使用的对象所占的内存空间&#xff0c;防止内存泄漏&#xff0c;确保应…

C#——析构函数详情

析构函数 C# 中的析构函数&#xff08;也被称作“终结器”&#xff09;同样是类中的一个特殊成员函数&#xff0c;主要用于在垃圾回收器回收类实例时执行一些必要的清理操作。 析构函数: 当一个对象被释放的时候执行 C# 中的析构函数具有以下特点&#xff1a; * 析构函数只…

助力OTT大屏营销,酷开科技引领产业变革与创新

随着大屏电视产品的迭代&#xff0c;越来越多家庭以增换购等多种形式获得超高清、超大屏的智能电视&#xff0c;大屏的人均拥有量和渗透率进一步增加。在这种情况下&#xff0c;通过OTT应用为载体&#xff0c;将大量内容持续输送到大屏终端&#xff0c;从而形成了电视硬件普及与…

消费盲返新风尚:让消费者与商家共享繁荣

在当今的消费市场中&#xff0c;消费盲返模式作为一种创新型的消费返利机制&#xff0c;正在逐步改变消费者的购物体验。这种模式不仅为消费者提供了额外的返利机会&#xff0c;同时也为商家带来了诸多益处&#xff0c;实现了消费者与商家的共赢。 消费盲返模式解读 消费盲返模…