Feature selection in low signal-to-noise environments like finance.
In the following we will create a feature selection function which would work on XGBoost models as well as Tensorflow and simple sklearn models.
We will use univariate as well as other state of the art selection methods such as boruta,sequential feature elimination and shap values.
It’s important ot notice that in noisy environments different feature selection methods (and even same method, run twice) will not usually produce same sets of features.
Thus we will measure weighted tau rank correlation between sets of features produced by different methods and same methods , but on train and test sets.
We will use weighted and not simple tau correlation to emphasize top ranked features.
Feature selection is usually the most time-consuming step in machine learning applications, thus we will be logging the progress to file using python logging module.
Another point to mention is that it’s useful to add non-informative “noise” features into the set of actual features, and then look into rank position of these noise features to measure the performance of the the feature selection algorithm.
For univariate feature selection we would recommend just using distance correlation (to measure non-linear dependence effects) and pearson correlation (for linear dependence) , as other methods, such as Fisher F-test, Chi2 and tau and spearman correlations would give similar results.
We will use the following python packages:
pip install xgboost pip install logging pip install mlextend pip install eli5 pip install pyentrp
The function signature will be the following:
def runselector(df,y,xs,model,nansy,nansx,methods=None,scoring=None,eval_metric=None,verbose=0,cv=None,eliniter=5,dftest=None):
where df is a dataframe (train set)
y is column name for y variable (we predict y ~ xs )
xs is list of feature column names.
nansy = NaN’s substitution rule for y
nansx NaN’s substitution rule for features
methods = list of feature selection methods
scoring = scoring function (i.e. f1)
cv = cross validation iterator
eliniter = number of iterations for eli method
dftest = test dataset (optional)
We will also make this function to work with sklearn’s pipelines, useful when features need scaling, and to avoid information leak from scaling whole dataset.
To run univariate feature selection inside feature selector we will use help function fs1().
def fs1(res,y,xs=None): if xs is None: xs=res.columns xs=list(xs) xs=list(set(xs)-set([y])) df=res[xs+[y]] dcors={} pearsons={} fctests={} frtests={} chi2abs={} mir={} mic={} kendalls={} for col in xs: dfdropna=df[[col,y]].replace([np.inf, -np.inf], np.nan).dropna() if dfdropna.shape==(0,2): continue try: fctests[col]=f_classif(dfdropna[[col]],dfdropna[y])[0] except: pass dcors[col]=dcor(dfdropna[col],dfdropna[y]) pearsons[col]=dfdropna[[col,y]].corr().iloc[1,0] corrs=DF(pearsons,index=['pearsonabs']).abs().T.join(DF(dcors,index=['dcor']).T).join(DF(fctests,index=['fc']).T) res=pd.concat([corrs,corrs.rank(pct=True).add_prefix('rk.')],axis=1).sort_values(by='rk.dcor',ascending=False) logging.info(f"fs1 rk.dcor index {list(res['rk.dcor'].index)}") return res
Example of usage on dummy classification problem with target variable y and features x1,x2,x3 and additional feaature q.x3 (quantile of x3 noise feature) :
from sklearn.ensemble import RandomForestClassifier import xgboost as xgb import numpy as np from dcor import distance_correlation as dcor from eli5.sklearn import PermutationImportance from mlxtend.feature_selection import SequentialFeatureSelector from pyentrp.entropy import permutation_entropy as pentropy import shap from scipy.stats import weightedtau as wtau npa=np.array DF=pd.DataFrame import logging modelxgb=xgb.XGBClassifier(objective ='binary:logistic', colsample_bytree = 1, learning_rate = 1,max_depth = 10, alpha = 1, n_estimators = 5) x1=npa([-1,-2,-3,-4,-5,6,7,8,9,10,11,12,13,14,15]) np.random.shuffle(x1) x2=x1+np.random.randn(len(x1)) x3=np.random.randn(len(x1)) y=(x1>0).astype(int) xs=['x1','x2','x3','q.x3'] dfunittest=DF({'x1':x1,'x2':x2,'x3':x3,'q.x3':pd.qcut(x3,3,labels=False),'y':y}) display(fs1(dfunittest,'y',xs)) runselector(dfunittest.iloc[:10],y='y',xs=xs,model=modelxgb,nansy='.fillna(0)',nansx=None,verbose=10,methods=['sfsb','sfsf','eli','shap'],dftest=dfunittest.iloc[-10:],scoring='f1',eval_metric='auc',cv=2)
output:
Full code of the feature selection and helper functions:
from sklearn.ensemble import RandomForestClassifier import xgboost as xgb import numpy as np from dcor import distance_correlation as dcor from eli5.sklearn import PermutationImportance from mlxtend.feature_selection import SequentialFeatureSelector from pyentrp.entropy import permutation_entropy as pentropy import shap from scipy.stats import weightedtau as wtau npa=np.array DF=pd.DataFrame import logging def calccorr(df,method='dcor',**kwargs): #ipdb.set_trace() if method.replace('abs','') in ['pearson','kendall','spearman']: dfres=df.corr(method=method.replace('abs','')).rename_axis(method) if 'abs' in method: return dfres.abs() else: return dfres cols=df.columns resd=DF(np.zeros((len(cols),len(cols))),index=cols,columns=cols) for icol1 in range(len(cols)): for icol2 in range(len(cols)): if icol1!=icol2: dfdropna=df.iloc[:,[icol1,icol2]].dropna() res=eval(method)(dfdropna.iloc[:,0],dfdropna.iloc[:,1],**kwargs) if method in ['wtau']: res=res[0] if method in ['np.corrcoef']: res=res[1,0] resd.iloc[icol1,icol2]=res else: #calc pentropy #ipdb.set_trace() resd.iloc[icol1,icol2]=pentropy(df.iloc[:,[icol1]].dropna().values.flatten(),order=3,delay=1,normalize=True) return resd.rename_axis(method) pd.core.frame.DataFrame.calccorr=calccorr def myround2(ts): try: return int(np.round(float(ts),2)*100) except: pass try: return (np.round(ts.astype(float),2)*100).astype(int) except: pass try: return {k:(np.round(v,2)*100).astype(int) for k,v in ts.items() } except Exception as e: pass return ts pd.core.frame.DataFrame.myround2=lambda df:df.applymap(myround2) pd.core.series.Series.myround2=lambda df:df.apply(myround2) def fs1(res,y,xs=None): if xs is None: xs=res.columns xs=list(xs) xs=list(set(xs)-set([y])) df=res[xs+[y]] dcors={} pearsons={} fctests={} frtests={} chi2abs={} mir={} mic={} kendalls={} for col in xs: dfdropna=df[[col,y]].replace([np.inf, -np.inf], np.nan).dropna() if dfdropna.shape==(0,2): continue try: fctests[col]=f_classif(dfdropna[[col]],dfdropna[y])[0] except: pass dcors[col]=dcor(dfdropna[col],dfdropna[y]) pearsons[col]=dfdropna[[col,y]].corr().iloc[1,0] corrs=DF(pearsons,index=['pearsonabs']).abs().T.join(DF(dcors,index=['dcor']).T).join(DF(fctests,index=['fc']).T) res=pd.concat([corrs,corrs.rank(pct=True).add_prefix('rk.')],axis=1).sort_values(by='rk.dcor',ascending=False) logging.info(f"fs1 rk.dcor index {list(res['rk.dcor'].index)}") return res def runselector(df,y,xs,model,nansy,nansx,methods=None,scoring=None,eval_metric=None,verbose=0,cv=None,eliniter=5,dftest=None): try: if 'xgb' in model.named_steps['pipe1'].__class__.__name__.lower(): if eval_metric=='f1': eval_metric=minusf1 except: if 'xgb' in 'xgb' in model.__class__.__name__.lower(): if eval_metric=='f1': eval_metric=minusf1 if methods is None: methods=['boruta','rfe','sfsb','sfsf','shap','eli'] xs=list(xs) if scoring is None: if df[y].nunique()<10: scoring='balanced_accuracy' else: scoring='r2' print('runselector scoring is None.using {}'.format(scoring)) ytrain=eval('df[y]'+nansy) xtrain=eval('df[xs]'+nansx) if nansx is not None else df[xs] try: inferfreq=pd.infer_freq(df.index) except: inferfreq=None logging.info(f"runselector START df.index.min,max={df.index.min(),df.index.max()} dftest.minmax={dftest.index.min(),dftest.index.max() if dftest is not None else 'None'} inferfreq={inferfreq} meansecsdiff= {float(np.diff(npa(df.index)).mean())/1e9}secs scoring={scoring} \n modelclass={model.__class__.__name__} modeldict={model.__dict__} xs={xs} \n {df.describe()}") try: model=eval(model) except Exception as e: print(e) fs1df=fs1(df,y,xs) res=fs1df[fs1df.columns[fs1df.columns.str.contains('rk\\.')]] if 'boruta' in methods: boruta_selector=BorutaPy(model).fit(xtrain,ytrain)#, n_estimators = 10, random_state = 0) # boruta_selector=BorutaPy(model).fit(xtrain.values,ytrain.values)#, n_estimators = 10, random_state = 0) boruta=DF({'boruta':boruta_selector.ranking_,'xs':xs}).set_index('xs').rank(ascending=False,pct=True).sort_values(by='boruta',ascending=False) logging.info(f'boruta: {boruta.round(2)*100}') res=res.join(boruta) #boruta_selector = selectormodel if 'abscoef' in methods: try: # ipdb.set_trace() modelcoef=clone(model) modelcoef.fit(xtrain, ytrain) rfeselectorranking=np.abs(modelcoef.coef_[0])*xtrain.std() #multiply by feature stddev, ocnditional that feture is centered at 0 abscoef=DF({'abscoef':rfeselectorranking,'xs':xs}).set_index('xs').rank(ascending=False,pct=True) res=res.join(abscoef) abscoeflog=DF({'abscoefbystd':rfeselectorranking,'coef':modelcoef.coef_[0],'std':xtrain.std(),'xs':xs}).set_index('xs').sort_values(by='abscoefbystd',ascending=False) logging.info(f'abscoef:\n {abscoeflog}') except Exception as e: print(f"abscoef:{e}") if 'rfe' in methods: try: rfeselector = RFE(model, 1, step=1).fit(xtrain, ytrain) rfeselectorranking=rfeselector.ranking_ rfe=DF({'rfe':rfeselectorranking,'xs':xs}).set_index('xs').rank(ascending=False,pct=True) res=res.join(rfe) except Exception as e: print(f"rfe:{e}") if 'sfsf' in methods: sfsf = SequentialFeatureSelector(model,k_features=len(xs), forward=True, floating=False, verbose=0, scoring=scoring, cv=cv).fit(xtrain, ytrain,custom_feature_names=xs) sfsF=DF(np.unique(DF(sfsf.get_metric_dict()).T['feature_names'].sum(), return_counts=True)).T.set_index(0).rank(pct=True).sort_values(by=1,ascending=False).rename(columns={1:'sfsF'}) if verbose>1: display(DF(sfsf.get_metric_dict()).T[['avg_score','cv_scores','std_dev','feature_names']]) res=res.join(sfsF) logging.info(f'sfsf:\n {sfsF.round(2)*100}') if 'sfsb' in methods: sfsb = SequentialFeatureSelector(model,k_features=1, forward=False, floating=False, verbose=0, scoring=scoring, cv=cv).fit(xtrain, ytrain,custom_feature_names=xs) sfsB=DF(np.unique(DF(sfsb.get_metric_dict()).T['feature_names'].sum(), return_counts=True)).T.set_index(0).rank(pct=True).sort_values(by=1,ascending=False).rename(columns={1:'sfsB'}) if verbose>1: sfsbd=DF(sfsb.get_metric_dict()).T[['avg_score','cv_scores','std_dev','feature_names']] if dftest is not None: sfsbd['dftest']='' for i,row in sfsbd.iterrows(): if 'Pipe' in model.__class__.__name__: model.fit(df[list(row['feature_names'])],df[y],pipe1__eval_metric=eval_metric,pipe1__eval_set=[(dftest[list(row['feature_names'])], dftest[y])],pipe1__verbose=0) sfsbd.at[i,'dftest'] = model.named_steps['pipe1'].evals_result()['validation_0'] else: model.fit(df[list(row['feature_names'])],df[y],eval_metric=eval_metric,eval_set=[(dftest[list(row['feature_names'])], dftest[y])],verbose=0) sfsbd.at[i,'dftest'] = model.evals_result()['validation_0'] print(list(row['feature_names']),eval_metric,sfsbd.at[i,'dftest']) logging.info(f"sfsbd=\n{sfsbd.myround2()}") display(sfsbd.round(3)) logging.info(f'sfsb:\n {sfsB.round(2)*100}') res=res.join(sfsB) model.fit(xtrain,ytrain) if 'eli' in methods: if cv is None: cv='prefit' permuter = PermutationImportance(model, scoring=None, cv=cv, n_iter=eliniter, random_state=42)#instantiate permuter object #'balanced_accuracy' 'prefit' elidf=DF({'eli':permuter.fit(xtrain.values,ytrain.values).feature_importances_,'xs':xs}).set_index('xs').rank(ascending=True,pct=True).sort_values(by='eli',ascending=False) logging.info(f'eli: {elidf.round(2)*100}') res=res.join(elidf) if 'shap' in methods: if 'Pipe' in model.__class__.__name__: if 'xgb' in model.named_steps['pipe1'].__class__.__name__.lower() and model.named_steps['pipe1'].get_params()['booster'] in ('gbtree',None): explainer=shap.TreeExplainer(model.named_steps['pipe1'],model_output='raw') elif 'NN' in model.named_steps['pipe1'].__class__.__name__: explainer=shap.DeepExplainer(model.named_steps['pipe1'].model.model, data=xtrain.values)#, session=None, learning_phase_flags=None) elif 'Logistic' in model.named_steps['pipe1'].__class__.__name__: explainer=shap.KernelExplainer(model.named_steps['pipe1'].predict_proba, data=xtrain.values, link='logit',l1_reg='aic') else: raise ValueError(f"shap for model {model.named_steps['pipe1'].__class__.__name__} {model.named_steps['pipe1'].get_params()} not imlemented in fs.py") else: if 'xgb' in model.__class__.__name__.lower() and model.get_params()['booster'] in ('gbtree',None): explainer=shap.TreeExplainer(model,model_output='raw') elif 'NN' in model.__class__.__name__: explainer=shap.DeepExplainer(model.model.model, data=xtrain.values)#, session=None, learning_phase_flags=None) elif 'Logistic' in model.__class__.__name__: explainer=shap.KernelExplainer(model.predict_proba, data=xtrain.values, link='logit',l1_reg='aic') else: raise ValueError(f"shap for model {model.__class__.__name__} and params={model.get_params()} not imlemented in fs.py") try: shap_values = explainer.shap_values(xtrain.values)#, tree_limit=5) concat=np.concatenate(shap_values) if type(shap_values)==type([]) else shap_values shap_abs = np.abs(concat) global_importances = np.nanmean(shap_abs, axis=0) indices = np.argsort(global_importances)[::-1] features_ranked = [] for f in range(df[xs].shape[1]): features_ranked.append(xs[indices[f]]) shapdf=DF({'shap':global_importances},index=xs).rank(ascending=True,pct=True) res=res.join(shapdf) if verbose>2: shap.summary_plot(shap_values, df[xs], plot_type="bar",class_names=model.classes_) shap.initjs() try: for i in range(len(explainer.expected_value)): display(shap.force_plot(explainer.expected_value[i], shap_values[i],df[xs])) except Exception as e: print(f"forceplot exception:{e}") except Exception as e: logging.warning(f"shaps exception = {e}") res['mean']=res.mean(axis=1) res=res.sort_values(by='mean',ascending=False) if verbose>1: logging.info(f"res.corr.mean=\n{100*res.corr().round(2)}") display(100*res.corr().round(2)) print(f"meancorr=\n{res.corr().mean().round(2)*100}") logging.info(f"meancorr=\n{res.corr().mean().myround2()}") if verbose>1: res1=res.copy() res1['pfname']=res1.index.str.split('.').str[-1] res1=res1.groupby('pfname').mean().sort_values(by='mean',ascending=False)#.set_index('pfname') print("pure features mean rank") display(res1) logging.info(f"pure fs mean rank\n {res1.myround2()}") print("pure features wtau") try: display(100*res1.calccorr(method='wtau').round(2)) logging.info(f"pure features wtau \n{res1.calccorr(method='wtau').myround2()}") except Exception as e: print(f"wtau calcorr exception{str(e)}") logging.info(f"runselector complete res=\n{res.round(2)*100} {list(res.index)}") return res
To run the feature selection on the scaled features one can use the same function and pipe as following:
model=Pipeline([("pipe0",StandardScaler()),("pipe1",xgb.XGBClassifier(objective ='binary:logistic', colsample_bytree = 1, learning_rate = 1,max_depth = 10, alpha = 1, n_estimators = 5))])