bitcoin and ethereum futures spread dynamics

Here we will download and display calendar futures spread on btc and eth from binance.
We will use the following code to get the data via http API. We will look into september / december 2020 calendar spread for coin futures (delivered in coin).

import requests
from datetime import datetime
def ts2dt(x):
    return (datetime.utcfromtimestamp(int(x)/1000.))

def getqf(pair='ETHUSD',interval='1d',q=0):
    contractType={1:'CURRENT_QUARTER',0:'NEXT_QUARTER'}[q]
    r =requests.get(f'https://www.binance.com/dapi/v1/continuousKlines?pair={pair}&interval={interval}&contractType={contractType}&limit=800').json()    
    df=DF(r)
    df[0]=df[0].apply(ts2dt)
    df[6]=df[6].apply(ts2dt)
    df=df.set_index(0)
    return df.apply(pd.to_numeric)

ethdf1=getqf(pair='ETHUSD',interval='2h',q=0)#.set_index(0)
ethdf2=getqf(pair='ETHUSD',interval='2h',q=1)#.set_index(0)
eth=(ethdf1-ethdf2) #ethereum spread
btcdf1=getqf(pair='BTCUSD',interval='2h',q=0)#.set_index(0)
btcdf2=getqf(pair='BTCUSD',interval='2h',q=1)#.set_index(0)
btc=(btcdf1-btcdf2) #bitcoin spread

(btcdf1).join(50*btc,rsuffix='btc')[['1','1btc']].plot() #btc spread and btc price

BTC price (blue) and september – december calendar BTCUSD spread (x50) :

And the same for Ethereum futures and calendar spread:

and now both eth and btc spread after rank transform to show that they move in tandem:

Posted in crypto Tagged with: ,

how to quickly get new crypto api points for new products

When new products are introduced on crypto exchanges, the python api’s and docuementation sometime is not complete, and it’s difficult to find exact symbol names and other paramters.To quickly find out symbol names and other paramters for api calls, we can use chrome.
In this example we will find out symbol names and api paramters for new binance coin futures (delivered in coin).

1. Open chrome and choose product of interest, in this case ETH USD quarterly future.

2. Press Ctrl-SHift-I this will open developer tools.
3. Choose “Network” tab and reload the webpage
4. Scroll through https calls and find data you are interested in i.e. Book Depth, Klines for candlesticks, etc

5. Now click on the data we are interested in (Klines in this case) and full API url will be shown

Now you can use it in jupyter notebook to get the data:

import requests
r =requests.get('https://www.binance.com/dapi/v1/continuousKlines?pair=ETHUSD&interval=15m&contractType=CURRENT_QUARTER&limit=800').json()
pd.DataFrame(r)

Which results in:

To convert timestamps we can use the following code:

from datetime import datetime
def ts2dt(x):
    return (datetime.utcfromtimestamp(int(x)/1000.))
df[0]=df[0].apply(ts2dt)
df[6]=df[6].apply(ts2dt)
Posted in crypto Tagged with: , ,

How to save order book and trades data for crypto futures

To save data in text format for crypto futures order book and trades from binance we can use the following python snippet:
(if you are interested to have -10% on binance trading fees you can use the following code: WFH7DYED )

import os
import sys
import re
from binance.client import Client
binance = Client(<YOUR API KEY>,<YOUR API SECRET>)
from twisted.internet import task, reactor
from datetime import timezone, datetime

timeout = 60*1 # Sixty seconds
def get_valid_filename(s):
    s = str(s).strip().replace(' ', '_')
    return re.sub(r'(?u)[^-\w.]', '', s)

def doWork():
    print(datetime.now())
    for currency in ["ETHUSDT"]:#,"BTC/USDT"]:
        fname='F'+get_valid_filename(currency)+'_bapi_'
        print(binance.futures_time()['serverTime'],'|',int(datetime.now(tz=timezone.utc).timestamp() * 1000),'| ',binance.futures_aggregate_trades(symbol=currency,limit=limit,startTime=int(int(datetime.now(tz=timezone.utc).timestamp() * 1000)-timeout*1000)),'|',int(datetime.now(tz=timezone.utc).timestamp() * 1000),file=open(fname+"trades.txt", "a"))
        print(binance.futures_time()['serverTime'],'|',int(datetime.now(tz=timezone.utc).timestamp() * 1000),'| ',binance.futures_order_book(symbol=currency, limit=50),'|',int(datetime.now(tz=timezone.utc).timestamp() * 1000),file=open(fname+"orderbook.txt", "a")) #fetch_order_book

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

This program will run loop to save the Ethereum perpetual futures data for trades and order book to files every minute.
We also log the times as the local time would be different from the binance server time.

Posted in crypto Tagged with: ,

Python structure for machine learning experiments

Here we will present the setup for single machine to run time consuming machine learning experiments like feature selection using different machine learning models.
First we will create python program which runs single experiment.
We will use argparse library to be able to specify experiment parameters such as target variable, machine learning models to run and number of hyper parameter optimisation iterations, among others.
We will use logging module to log everything into one large text file.

The data for training will be read from the pickle files, as this is the fastest way to read the data.
The data will be created by an external program and then dataframe pickled to disk.

The structure of this python the following ,with example variables to include:

from sklearn.pipeline import Pipeline
from ci.fs import StandardScaler

import argparse
my_parser = argparse.ArgumentParser(description='run classification models on df with features')
my_parser.add_argument('-resample','-r',metavar='resample',type=str,help='resample 1Min 30S 5Min', dest="resample", default='30S')
my_parser.add_argument('-y',metavar='y',type=str,help='y = ybs ybb ycb ycs',dest="y", default='ycb')
my_parser.add_argument('-xs',metavar='xs',type=str,help='xs = all raw q3 q95 filename ',dest="xs", default='all')
my_parser.add_argument('-xsraw',metavar='xsraw',nargs='+',help='xsraw features',dest="xsraw", default=['m','amb','nbp','dpinsell','qbp','dpin','qsp','qimb','signimb','hml','vimb1','vimb5','cimb1','cimb5','vimb1000','cimb1000','vwap1','r'])


my_parser.add_argument('-models','--m', nargs='+', help='models to run = log lin xgb nn plog plin pn',dest="models", default=['log'])
my_parser.add_argument('-optiter',metavar='o',type=int,help='n_iter in CVrandsearch for all models',dest="optiter", default=100)

my_parser.add_argument('-test','--t',dest="test", default=False,action='store_true')
my_parser.add_argument('-addlog',dest="addlog", default=False,action='store_true')
my_parser.add_argument('-interact','--i',help='True is use interact features',dest="interact", default=False,action='store_true')
my_parser.add_argument('-scale',help='True to scale via pipeline ',dest="scale", default=False,action='store_true')


my_parser.add_argument('-gap',metavar='o',type=int,help='cv gap',dest="gap", default=100)
my_parser.add_argument('-max_train_size',metavar='o',type=int,help='cv max train size',dest="max_train_size", default=2000)

args = my_parser.parse_args()
logging.info(f"START - args={args}")
resample = args.resample
optiter=args.optiter
models=args.models
y = args.y
xs=args.xs
xsraw=args.xsraw
interact=args.interact
addlog=args.addlog
scale=args.scale

dffilename='dfbt'+resample+'f.pkl'#'dfbt30sf.pkl' dfbt30Sfandinter.pkl
pd.set_option("display.precision", 3)

df=pd.read_pickle(dffilename).ffill()
print(f"dfcols={list(df.columns)}")
#keep only float columns,  not time
#ipdb.set_trace()
df=df.select_dtypes(include=[np.float,np.int,np.int64,np.int32])
print(f"only floats={list(df.columns)}")

#linear regression ys
yrs=getfeaturenames('y',df.columns)
yrs=[yr for yr in yrs if df[yr].nunique()>10]
print(f"yrs={yrs}") 

xsraw=getfeaturenames('raw',df.columns,xsraw=xsraw)
xsq3=getfeaturenames('q3',df.columns,xsraw=xsraw)
xsq95=getfeaturenames('q95',df.columns,xsraw=xsraw)


xsall=getfeaturenames('all',df.columns)
xsinteract=getfeaturenames('interact',df.columns)

xs={'all':xsall,'raw':xsraw,'q3':xsq3,'q95':xsq95}[xs]

if addlog:
    _,lognames=df.addlog(cols=xs,inplace=True,retfnames=True)
    xs+=lognames

if interact:
    xs.extend(xsinteract)

if 'xgbc' in models or 'pxgbc' in models:#False:

    params={'scale_pos_weight': 100, 'n_estimators': 30, 'max_depth': 5, 'max_delta_step': 10, 'learning_rate': 0.1, 'colsample_bytree': 0.8, 'base_score': 0.1, 'alpha': 1}
    fixedparams=dict(objective ='binary:logistic')
    model=xgb.XGBClassifier(**fixedparams,**params)
    params = {
            'n_estimators':[10,100,200],
            'colsample_bytree': [ 0.8, 1.0],
            'max_depth': [5,10],
            'learning_rate':[0.01,0.1,1],
            'alpha':[1,10,100],
            'scale_pos_weight':[1,10,100],
            'base_score':[0.1,0.9],
            'max_delta_step':[0,1,10]
            }
    randcv = RandomizedSearchCV(model, param_distributions=params, n_iter=n_iter, scoring='f1', n_jobs=1, cv=cv, verbose=0, random_state=1).fit(dftrain[xs], dftrain[y])
    logging.info(f"rscv {model.__class__.__name__} fixedparams={fixedparams} bestscore={randcv.best_score_} bestparams={randcv.best_params_} \n{DF(randcv.cv_results_).sort_values(by='mean_test_score',ascending=False)[['mean_test_score' ,'std_test_score', 'params']]}")
    logging.debug(f"rscv \n{DF(randcv.cv_results_).sort_values(by='mean_test_score',ascending=False)}")
    xgbc=xgb.XGBClassifier(**fixedparams,**randcv.best_params_)


def getmodel(modelname):
    if modelname[0]=='p':
        return Pipeline([("pipe0",StandardScaler()),("pipe1",eval(modelname[1:]))])
    else:
        return eval(modelname)

for mstr in models:
    runselector(dftrain,y=y,xs=xs,model=getmodel(mstr),nansy='.fillna(0)',nansx=None,verbose=2,methods=['sfsb','sfsf','rfe','abscoef'],dftest=dftest,scoring='f1',eval_metric='f1',cv=cv)  #,

where we would use runselector function from the feature selection post.

We would then run this python file using windows bat files as following:


call python runexp.py -resample 30S -y ybb -xs raw  -models plog pxgbc
call python runexp.py -resample 30S -y ybs -xs raw  -models plog pxgbc
call python runexp.py -resample 30S -y ycb -xs raw  -models plog pxgbc

Posted in machine learning Tagged with: ,

Feature selection

Feature selection in low signal-to-noise environments like finance.

In the following we will create a feature selection function which would work on XGBoost models as well as Tensorflow and simple sklearn models.
We will use univariate as well as other state of the art selection methods such as boruta,sequential feature elimination and shap values.

It’s important ot notice that in noisy environments different feature selection methods (and even same method, run twice) will not usually produce same sets of features.
Thus we will measure weighted tau rank correlation between sets of features produced by different methods and same methods , but on train and test sets.
We will use weighted and not simple tau correlation to emphasize top ranked features.

Feature selection is usually the most time-consuming step in machine learning applications, thus we will be logging the progress to file using python logging module.

Another point to mention is that it’s useful to add non-informative “noise” features into the set of actual features, and then look into rank position of these noise features to measure the performance of the the feature selection algorithm.

For univariate feature selection we would recommend just using distance correlation (to measure non-linear dependence effects) and pearson correlation (for linear dependence) , as other methods, such as Fisher F-test, Chi2 and tau and spearman correlations would give similar results.

We will use the following python packages:

 pip install xgboost
 pip install logging
 pip install mlextend
 pip install eli5
 pip install pyentrp
 
 

The function signature will be the following:

 def runselector(df,y,xs,model,nansy,nansx,methods=None,scoring=None,eval_metric=None,verbose=0,cv=None,eliniter=5,dftest=None):
 

where df is a dataframe (train set)
y is column name for y variable (we predict y ~ xs )
xs is list of feature column names.
nansy = NaN’s substitution rule for y
nansx NaN’s substitution rule for features
methods = list of feature selection methods
scoring = scoring function (i.e. f1)
cv = cross validation iterator
eliniter = number of iterations for eli method
dftest = test dataset (optional)

We will also make this function to work with sklearn’s pipelines, useful when features need scaling, and to avoid information leak from scaling whole dataset.

To run univariate feature selection inside feature selector we will use help function fs1().

def fs1(res,y,xs=None):
    if xs is None:
        xs=res.columns
    xs=list(xs)
    xs=list(set(xs)-set([y]))
    df=res[xs+[y]]
    dcors={}
    pearsons={}
    fctests={}
    frtests={}
    chi2abs={}
    mir={}
    mic={}
    kendalls={}
    for col in xs:
        dfdropna=df[[col,y]].replace([np.inf, -np.inf], np.nan).dropna()
        if dfdropna.shape==(0,2):
            continue
        try:
            fctests[col]=f_classif(dfdropna[[col]],dfdropna[y])[0]
        except:
            pass

        dcors[col]=dcor(dfdropna[col],dfdropna[y])
        pearsons[col]=dfdropna[[col,y]].corr().iloc[1,0]

    corrs=DF(pearsons,index=['pearsonabs']).abs().T.join(DF(dcors,index=['dcor']).T).join(DF(fctests,index=['fc']).T)
    res=pd.concat([corrs,corrs.rank(pct=True).add_prefix('rk.')],axis=1).sort_values(by='rk.dcor',ascending=False) 
    logging.info(f"fs1 rk.dcor index {list(res['rk.dcor'].index)}")
    return res

Example of usage on dummy classification problem with target variable y and features x1,x2,x3 and additional feaature q.x3 (quantile of x3 noise feature) :

from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb
import numpy as np
from dcor import distance_correlation as dcor
from eli5.sklearn import PermutationImportance
from mlxtend.feature_selection import SequentialFeatureSelector
from pyentrp.entropy import permutation_entropy as pentropy
import shap
from scipy.stats import weightedtau as wtau
npa=np.array
DF=pd.DataFrame
import logging

modelxgb=xgb.XGBClassifier(objective ='binary:logistic', colsample_bytree = 1, learning_rate = 1,max_depth = 10, alpha = 1, n_estimators = 5)
x1=npa([-1,-2,-3,-4,-5,6,7,8,9,10,11,12,13,14,15])
np.random.shuffle(x1)
x2=x1+np.random.randn(len(x1))
x3=np.random.randn(len(x1))
y=(x1>0).astype(int)
xs=['x1','x2','x3','q.x3']
dfunittest=DF({'x1':x1,'x2':x2,'x3':x3,'q.x3':pd.qcut(x3,3,labels=False),'y':y})
display(fs1(dfunittest,'y',xs))
runselector(dfunittest.iloc[:10],y='y',xs=xs,model=modelxgb,nansy='.fillna(0)',nansx=None,verbose=10,methods=['sfsb','sfsf','eli','shap'],dftest=dfunittest.iloc[-10:],scoring='f1',eval_metric='auc',cv=2)

output:

Full code of the feature selection and helper functions:

from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb
import numpy as np
from dcor import distance_correlation as dcor
from eli5.sklearn import PermutationImportance
from mlxtend.feature_selection import SequentialFeatureSelector
from pyentrp.entropy import permutation_entropy as pentropy
import shap
from scipy.stats import weightedtau as wtau
npa=np.array
DF=pd.DataFrame
import logging


def calccorr(df,method='dcor',**kwargs):
    #ipdb.set_trace()
    if method.replace('abs','') in ['pearson','kendall','spearman']:
        dfres=df.corr(method=method.replace('abs','')).rename_axis(method)
        if 'abs' in method:
            return dfres.abs()
        else:
            return dfres
    cols=df.columns
    resd=DF(np.zeros((len(cols),len(cols))),index=cols,columns=cols)
    for icol1 in range(len(cols)):
        for icol2 in range(len(cols)):
            if icol1!=icol2:
                dfdropna=df.iloc[:,[icol1,icol2]].dropna()
                res=eval(method)(dfdropna.iloc[:,0],dfdropna.iloc[:,1],**kwargs)
                if method in ['wtau']:
                    res=res[0]
                if method in ['np.corrcoef']:
                    res=res[1,0]
                resd.iloc[icol1,icol2]=res
            else: #calc pentropy
                #ipdb.set_trace()
                resd.iloc[icol1,icol2]=pentropy(df.iloc[:,[icol1]].dropna().values.flatten(),order=3,delay=1,normalize=True)                
    return resd.rename_axis(method)

pd.core.frame.DataFrame.calccorr=calccorr 


def myround2(ts):
        try:
            return int(np.round(float(ts),2)*100)
        except: pass
        try:
            return (np.round(ts.astype(float),2)*100).astype(int)
        except: pass
        try:
            return {k:(np.round(v,2)*100).astype(int) for k,v in ts.items() }
        except Exception as e:
            pass
        return ts

pd.core.frame.DataFrame.myround2=lambda df:df.applymap(myround2)
pd.core.series.Series.myround2=lambda df:df.apply(myround2)

def fs1(res,y,xs=None):
    if xs is None:
        xs=res.columns
    xs=list(xs)
    xs=list(set(xs)-set([y]))
    df=res[xs+[y]]
    dcors={}
    pearsons={}
    fctests={}
    frtests={}
    chi2abs={}
    mir={}
    mic={}
    kendalls={}
    for col in xs:
        dfdropna=df[[col,y]].replace([np.inf, -np.inf], np.nan).dropna()
        if dfdropna.shape==(0,2):
            continue
        try:
            fctests[col]=f_classif(dfdropna[[col]],dfdropna[y])[0]
        except:
            pass

        dcors[col]=dcor(dfdropna[col],dfdropna[y])
        pearsons[col]=dfdropna[[col,y]].corr().iloc[1,0]

    corrs=DF(pearsons,index=['pearsonabs']).abs().T.join(DF(dcors,index=['dcor']).T).join(DF(fctests,index=['fc']).T)
    res=pd.concat([corrs,corrs.rank(pct=True).add_prefix('rk.')],axis=1).sort_values(by='rk.dcor',ascending=False) 
    logging.info(f"fs1 rk.dcor index {list(res['rk.dcor'].index)}")
    return res

def runselector(df,y,xs,model,nansy,nansx,methods=None,scoring=None,eval_metric=None,verbose=0,cv=None,eliniter=5,dftest=None):
    try:    
        if 'xgb' in model.named_steps['pipe1'].__class__.__name__.lower():
            if eval_metric=='f1':
                eval_metric=minusf1
    except:
        if 'xgb' in 'xgb' in model.__class__.__name__.lower():
            if eval_metric=='f1':
                eval_metric=minusf1
        
    if methods is None:
        methods=['boruta','rfe','sfsb','sfsf','shap','eli']
    
    xs=list(xs)
    if scoring is None:
        if df[y].nunique()<10:
            scoring='balanced_accuracy'
        else:
            scoring='r2'
        print('runselector scoring is None.using {}'.format(scoring))
    
    ytrain=eval('df[y]'+nansy)
    xtrain=eval('df[xs]'+nansx) if nansx is not None else df[xs]
    try:
        inferfreq=pd.infer_freq(df.index)
    except:
        inferfreq=None

    logging.info(f"runselector START df.index.min,max={df.index.min(),df.index.max()} dftest.minmax={dftest.index.min(),dftest.index.max() if dftest is not None else 'None'}  inferfreq={inferfreq} meansecsdiff= {float(np.diff(npa(df.index)).mean())/1e9}secs scoring={scoring} \n modelclass={model.__class__.__name__} modeldict={model.__dict__} xs={xs} \n {df.describe()}")
    
    try:
        model=eval(model)
    except Exception as e:
        print(e)
           
    fs1df=fs1(df,y,xs)
    res=fs1df[fs1df.columns[fs1df.columns.str.contains('rk\\.')]]
    
    if 'boruta' in methods:

        boruta_selector=BorutaPy(model).fit(xtrain,ytrain)#, n_estimators = 10, random_state = 0)
  #      boruta_selector=BorutaPy(model).fit(xtrain.values,ytrain.values)#, n_estimators = 10, random_state = 0)
        boruta=DF({'boruta':boruta_selector.ranking_,'xs':xs}).set_index('xs').rank(ascending=False,pct=True).sort_values(by='boruta',ascending=False)
        logging.info(f'boruta: {boruta.round(2)*100}')
        res=res.join(boruta)
    #boruta_selector = selectormodel
    
    if 'abscoef' in methods:
        try:
#            ipdb.set_trace()
            modelcoef=clone(model)
            modelcoef.fit(xtrain, ytrain)
            rfeselectorranking=np.abs(modelcoef.coef_[0])*xtrain.std() #multiply by feature stddev, ocnditional that feture is centered at 0
            abscoef=DF({'abscoef':rfeselectorranking,'xs':xs}).set_index('xs').rank(ascending=False,pct=True)
            res=res.join(abscoef)
            abscoeflog=DF({'abscoefbystd':rfeselectorranking,'coef':modelcoef.coef_[0],'std':xtrain.std(),'xs':xs}).set_index('xs').sort_values(by='abscoefbystd',ascending=False)
            logging.info(f'abscoef:\n {abscoeflog}')
        except Exception as e:
            print(f"abscoef:{e}")

    if 'rfe' in methods:
        try:
            rfeselector = RFE(model, 1, step=1).fit(xtrain, ytrain)
            rfeselectorranking=rfeselector.ranking_
            rfe=DF({'rfe':rfeselectorranking,'xs':xs}).set_index('xs').rank(ascending=False,pct=True)
            res=res.join(rfe)
        except Exception as e:
            print(f"rfe:{e}")
    if 'sfsf' in methods:
        sfsf = SequentialFeatureSelector(model,k_features=len(xs), forward=True, floating=False,  verbose=0,  scoring=scoring,  cv=cv).fit(xtrain, ytrain,custom_feature_names=xs)
        sfsF=DF(np.unique(DF(sfsf.get_metric_dict()).T['feature_names'].sum(), return_counts=True)).T.set_index(0).rank(pct=True).sort_values(by=1,ascending=False).rename(columns={1:'sfsF'})
        if verbose>1: 
            display(DF(sfsf.get_metric_dict()).T[['avg_score','cv_scores','std_dev','feature_names']])
        res=res.join(sfsF)
        logging.info(f'sfsf:\n {sfsF.round(2)*100}')
    if 'sfsb' in methods:
        sfsb = SequentialFeatureSelector(model,k_features=1, forward=False, floating=False,  verbose=0,  scoring=scoring,  cv=cv).fit(xtrain, ytrain,custom_feature_names=xs)
        sfsB=DF(np.unique(DF(sfsb.get_metric_dict()).T['feature_names'].sum(), return_counts=True)).T.set_index(0).rank(pct=True).sort_values(by=1,ascending=False).rename(columns={1:'sfsB'})

        if verbose>1:
            sfsbd=DF(sfsb.get_metric_dict()).T[['avg_score','cv_scores','std_dev','feature_names']]
            if dftest is not None:
                sfsbd['dftest']=''
                
                for i,row in sfsbd.iterrows():
                    if 'Pipe' in model.__class__.__name__:
                        model.fit(df[list(row['feature_names'])],df[y],pipe1__eval_metric=eval_metric,pipe1__eval_set=[(dftest[list(row['feature_names'])], dftest[y])],pipe1__verbose=0)
                        sfsbd.at[i,'dftest'] = model.named_steps['pipe1'].evals_result()['validation_0']
                    else:
                        model.fit(df[list(row['feature_names'])],df[y],eval_metric=eval_metric,eval_set=[(dftest[list(row['feature_names'])], dftest[y])],verbose=0)
                        sfsbd.at[i,'dftest'] = model.evals_result()['validation_0']
                    
                    print(list(row['feature_names']),eval_metric,sfsbd.at[i,'dftest'])
            logging.info(f"sfsbd=\n{sfsbd.myround2()}")
            display(sfsbd.round(3))
        logging.info(f'sfsb:\n {sfsB.round(2)*100}')
        res=res.join(sfsB)
        
    model.fit(xtrain,ytrain)
    
    if 'eli' in methods:
        if cv is None:
            cv='prefit'
        permuter = PermutationImportance(model, scoring=None, cv=cv, n_iter=eliniter, random_state=42)#instantiate permuter object #'balanced_accuracy'  'prefit'
        elidf=DF({'eli':permuter.fit(xtrain.values,ytrain.values).feature_importances_,'xs':xs}).set_index('xs').rank(ascending=True,pct=True).sort_values(by='eli',ascending=False)
        logging.info(f'eli: {elidf.round(2)*100}')
        res=res.join(elidf)
            
    if 'shap' in methods:
        if 'Pipe' in model.__class__.__name__:
            if 'xgb' in model.named_steps['pipe1'].__class__.__name__.lower() and model.named_steps['pipe1'].get_params()['booster'] in ('gbtree',None):
                explainer=shap.TreeExplainer(model.named_steps['pipe1'],model_output='raw')
            elif 'NN' in model.named_steps['pipe1'].__class__.__name__:
                explainer=shap.DeepExplainer(model.named_steps['pipe1'].model.model, data=xtrain.values)#, session=None, learning_phase_flags=None)
            elif 'Logistic' in model.named_steps['pipe1'].__class__.__name__:
                explainer=shap.KernelExplainer(model.named_steps['pipe1'].predict_proba, data=xtrain.values, link='logit',l1_reg='aic')
            else:
                raise ValueError(f"shap for model {model.named_steps['pipe1'].__class__.__name__} {model.named_steps['pipe1'].get_params()} not imlemented in fs.py")
        else:
            if 'xgb' in model.__class__.__name__.lower() and model.get_params()['booster'] in ('gbtree',None):
                explainer=shap.TreeExplainer(model,model_output='raw')
            elif 'NN' in model.__class__.__name__:
                explainer=shap.DeepExplainer(model.model.model, data=xtrain.values)#, session=None, learning_phase_flags=None)
            elif 'Logistic' in model.__class__.__name__:
                explainer=shap.KernelExplainer(model.predict_proba, data=xtrain.values, link='logit',l1_reg='aic')
            else:
                raise ValueError(f"shap for model {model.__class__.__name__} and params={model.get_params()} not imlemented in fs.py")

        try:

            shap_values = explainer.shap_values(xtrain.values)#, tree_limit=5)
            concat=np.concatenate(shap_values) if type(shap_values)==type([]) else shap_values
            shap_abs = np.abs(concat)
            global_importances = np.nanmean(shap_abs, axis=0)
            indices = np.argsort(global_importances)[::-1]
            features_ranked = []
            for f in range(df[xs].shape[1]):
                features_ranked.append(xs[indices[f]])
            shapdf=DF({'shap':global_importances},index=xs).rank(ascending=True,pct=True)
            res=res.join(shapdf)
            if verbose>2:
                shap.summary_plot(shap_values, df[xs], plot_type="bar",class_names=model.classes_)
                shap.initjs()
                try:
                    for i in range(len(explainer.expected_value)):
                        display(shap.force_plot(explainer.expected_value[i], shap_values[i],df[xs]))
                except Exception as e:
                    print(f"forceplot exception:{e}")
        except Exception as e:
            logging.warning(f"shaps exception = {e}")            
            
    res['mean']=res.mean(axis=1)
    res=res.sort_values(by='mean',ascending=False)
    
    if verbose>1:
        logging.info(f"res.corr.mean=\n{100*res.corr().round(2)}")
        display(100*res.corr().round(2))
        print(f"meancorr=\n{res.corr().mean().round(2)*100}")
        logging.info(f"meancorr=\n{res.corr().mean().myround2()}")
        
    if verbose>1:
        res1=res.copy()
        res1['pfname']=res1.index.str.split('.').str[-1]
        res1=res1.groupby('pfname').mean().sort_values(by='mean',ascending=False)#.set_index('pfname')
        print("pure features mean rank")
        display(res1)
        logging.info(f"pure fs mean rank\n {res1.myround2()}")
        print("pure features wtau")
        try:
            display(100*res1.calccorr(method='wtau').round(2))
            logging.info(f"pure features wtau \n{res1.calccorr(method='wtau').myround2()}")
        except Exception as e:
            print(f"wtau calcorr exception{str(e)}")
     
    logging.info(f"runselector complete res=\n{res.round(2)*100} {list(res.index)}")
    return res

To run the feature selection on the scaled features one can use the same function and pipe as following:


model=Pipeline([("pipe0",StandardScaler()),("pipe1",xgb.XGBClassifier(objective ='binary:logistic', colsample_bytree = 1, learning_rate = 1,max_depth = 10, alpha = 1, n_estimators = 5))])

Posted in machine learning Tagged with: ,

How to display candle stick bars from binance futures in jupyter notebook

In order to download and display binance candlestick bars in jupyter notebook we will need the following packages:

pip install mplfinance
pip install python-binance
pip install plotly

Also you would need to get API keys from binance Binance API management .

We will download and display two candle stick charts for ETH futures, one using mplfinance library, and another using plotly.
We will use 1 minute ETHUSDT futures data.

from binance.client import Client

from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import cufflinks as cf
cf.go_offline()
init_notebook_mode(connected=True)

import pandas as pd
import datetime as dt
import matplotlib.pyplot as plt
%matplotlib inline
from dateutil import parser
import math
import os.path
import time
import plotly.graph_objects as go
from datetime import datetime
import mplfinance as mpf

binance_api_key = '<YOUR API KEY>'
binance_api_secret = '<YOUR API SECRET>'

binsizes = {"1m": 1, "5m": 5, "1h": 60, "1d": 1440}
batch_size = 750
binance = Client(api_key=binance_api_key, api_secret=binance_api_secret,)

def binanceklines(symbol='ETHUSDT',interval='1m',limit=500,since="1 day ago UTC"):
    klines = binance.futures_klines(symbol='ETHUSDT',interval={'1m':Client.KLINE_INTERVAL_1MINUTE,'5m':Client.KLINE_INTERVAL_5MINUTE}[interval],since=since,limit=limit)
    data = pd.DataFrame(klines, columns = ['ts', 'o', 'h', 'l', 'c', 'v', 'close_time', 'quote_av', 'trades', 'tb_base_av', 'tb_quote_av', 'ignore' ])
    data=data.apply(pd.to_numeric)
    data['ts'] = pd.to_datetime(data['ts'], unit='ms')
    data=data.set_index('ts')
    return data

df=binanceklines(limit=None)
fig = go.Figure(data=[go.Candlestick(x=df.index,open=df['o'],high=df['h'],low=df['l'],close=df['c'])])
fig.show()

plt.rcParams["figure.figsize"] = (10,8)
mpf.plot(df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume'}).apply(pd.to_numeric),type='bars',volume=True,mav=(20,40),figscale=3,style='charles')

This results in:

The advantage of plotly chart is that it’s more interactive.

In case your are interested to 10% binance promo code discount on binance trading fees, you can use the following code: WFH7DYED

Posted in crypto Tagged with: , ,

How to check time-series for abnormality

In many time series machine learning problems the with large number of features the raw data might contain

– abnormal / extreme points
– discontinuities
– stale data

To help with determining quickly abnormal or extreme points we can use z-transform of the time series.
To dtermine if time series contain discountinuities we can calculate how much removing one point changes sum of first difference of the time series.
and lastly to determine stale/predictable data we can use permutation-entropy implemented in python package pyentrp.

The code to run all the 3 tests at once is present below:

from scipy.stats import chi2 
from pyentrp.entropy import permutation_entropy as pentropy

def smoothcoef(y):
    y=npa(y).flatten()
    ymax=max(np.abs(y).max(),0.0000001)
    tv=np.zeros(len(y))
    for i in range(len(y)-1):
        try:
            tv[i]=np.abs(np.diff(np.delete(y,i),1)).sum()
        except:
            ipdb.set_trace()
    tv=np.delete(tv,[0,len(tv)-1])
    tv=np.abs(np.diff(tv,1))
    return tv.max()/ymax

def abnormality(ts,thresh,retpoints=False,plot=False): #high values means abnormal
        avg=ts.mean()
        var=ts.var()
        nans=ts.isnull().sum()/len(ts)
        abnormal=(ts-avg)**2/var >chi2.interval(1-thresh, 1)[1]
        if plot:
            if (plot=='abnormal' and abnormal.any()) or plot=='all':                
                plt.figure(figsize = (4, 4))
                plt.clf()
                plt.scatter(ts.index,ts.values,c=abnormal,cmap='bwr',marker='.') 
                plt.show()
        res={}
        if retpoints:
            res['abnpoints']=ts[abnormal]
        return {**res,'abnormal':float(abnormal.any()),'nans':nans,'nunique':1-ts.nunique()/len(ts),'smooth':smoothcoef(ts.dropna()),'pentr':1-pentropy(ts.dropna(),normalize=True)}

Example usage is shown in the following code:

DF=pd.DataFrame
df=DF({'x':np.linspace(1,10,30)})
df['y']=np.sin(df['x'])
df
df['y'].iloc[4]=6
df[['y']].plot()
abnormality(df['y'],0.001,retpoints=False,plot='abnormal')

the result is show on the following figure:

Interpretation of the results is the following:

Higher the number => more probability there is abnormality in the time series.
When plot parameter is specified graph will be shown with abnormal points in red.

Posted in machine learning Tagged with: ,

How to add feature interactions

Sometimes it is preferable to use simple machine learning algorithms such as logistic regression due to speed and explainability.
But usually these simple algorithms do not incorporate interactions of the features (in contrary to , say, neural networks, where sum/difference of the features is incorporated automatically, as each neuron would sum up incoming connections, and using log transform also can add product/division of the features).

Thus here we present simple way to add feature interactions into machine learning pipeline:

def addinteract(df,cols=None,inplace=False,ops=None,retfnames=False):
    df = df if inplace else df.copy()
    fnames=[]
    if cols is None:
        cols=df.columns
    if ops is None:
        ops=['sum','sub','prd']
    def sum(a,b):
        return a+b
    def sub(a,b):
        return a-b
    def prod(a,b):
        return a*b
    for i in range(len(cols)):
        for j in range(i+1,len(cols)):
            for op in ops:
                try:
                    fname=op+'('+cols[i]+','+cols[j]+')'
                    df[fname]=eval(op)(df[cols[i]],df[cols[j]])
                    fnames.append(fname)
                except Exception as e:
                    print(e)
    if retfnames:
        return df,fnames
    return df
pd.core.frame.DataFrame.addinteract=addinteract

We can use it in the following way:

xint=np.random.randint(0,10,20)
DF=pd.DataFrame
DF({'x':xint,'y':xint-1,'z':xint+2}).addinteract()

which results in:

Posted in machine learning Tagged with: ,

time series feature transformations – quantiles

For machine learning algorithms to work well, it’s usually useful to remove noise from features.For time-series this can be achieved in several ways, such as moving averages, applying sign transform, or applying low pass filter. Other, more simple way is to just apply quantilization on the features i.e. break feature values into quantiles. There are several caveats to do it in python however.When there are very few values of the feature the default behaviour of pandas’s qcut function would be to bin the values into unequal buckets. To confront this there are 2 ways:

1) rank feature values first via rank() function using ‘first’ method
2) add random noise to the feature

2nd method is more universal as it will move the features into
different quantile buckets randomly thus keeping symmetry. But in some paplication it may not matter.
To experiment with different ways to quantilize the data we can use the following function:

def addqs(df,nq=5,lags=None,cols=None,inplace=False,method='randn',retfnames=False):
    # if method=='qcut , possible that 0,5%,95%,100%  quantiles a lot of -1 , 1 instead of alot 0, randn should compensate on average
    df = df if inplace else df.copy()
    fnames=[]
    if cols is None:
        cols=df.columns
    for col in cols:
        if lags is None:
            dftemp=df[[col]]
            kwargs={}
            iloc=list(range(len(dftemp)))
        else:
            dftemp=df[col].rolling(lags)
            kwargs={'raw':False}
            iloc=-1
        if type(nq)==type([]):
            norm=(len(nq)-1)//2
        else:
            norm=(nq-1)//2
        if method=='qcut':
            res=dftemp.apply(lambda x: pd.qcut(x, nq, labels=False,duplicates='drop').iloc[iloc]-norm ,**kwargs)#pd.rank,pct=True)
        elif method=='rank':
            res=dftemp.apply(lambda x: pd.qcut(x.rank(method='first'), nq, labels=False,duplicates='drop').iloc[iloc]-norm ,**kwargs)#pd.rank,pct=True)
        elif method=='randn':
            res=dftemp.apply(lambda x: pd.qcut(x+0.0000000001*np.random.randn(len(x)), nq, labels=False,duplicates='drop').iloc[iloc]-norm,**kwargs)#pd.rank,pct=True)
        fname='q'+method+str(nq).replace("[", "").replace("]", "").replace(" ", "")+','+str(lags)+'.'+col
        df[fname]=res
        fnames.append(fname)
    if retfnames:
        return df,fnames
    return df

pd.core.frame.DataFrame.addqs=addqs
DF=pd.DataFrame

After defining this function and adding it as a DataFrame function we can use it like this:

xint=np.random.randint(0,10,20)
DF({'x':xint}).addqs(nq=3,lags=5,method='rank')

In this example we used nq (number of quantiles) = 3 and 5 lags

In the following we can compare all 3 methods of quantilization for lag=5:

We can see that randn method geenrates more equal quantiles than other methods.

and now the same 3 methods for lag=None i.e. quantilization from the start of time-series.

The code to generate previous dataframe and histograms in jupyter is the following:

lagsN=None # or 5
df=DF({'x':xint}).addqs(nq=3,lags=lagsN,method='qcut')
for lags in [lagsN]: #[None,5]
    for nq in [3]: #[0,0.05,0.95,1],
        for method in ['rank','qcut','randn']:
            df=df.join(DF({'x':xint}).addqs(nq=nq,lags=lags,method=method),rsuffix='w')#.drop(columns='x.',errors='ignore')#.set_index('x')
df=df.set_index('x')
df=df.drop(columns=df.columns[list(df.columns.str.contains('w'))])
df.hist()
Posted in machine learning Tagged with: ,

How to run sklearn’s GridSearchCV with Tensorflow keras models.

To find optimal parameters for Neural network one would usually use RandomizedSearchCV or GridSearchCV from sklearn library.

Tensorflow keras models, such as KerasClassifier, when calling fit() function does not permit to have different number of neurons.

GridSearchCV and RandomizedSearchCV call fit() function on each parameter iteration, thus we need to create new subclass of *KerasClassifier* to be able to specify different number of neurons per layer.

To create a keras model we need a function in the global scope which we will call *build_model2*.
It will build a neural network with 2 hidden layers , with dropout after each hidden layer and custom output_bias.
Output_bias is important for problems with a highly unbalanced dataset.

def build_model2(nfirst,nfeatures,nhidden1,nhidden2,dropout,output_bias,lr): 
output_bias = tf.keras.initializers.Constant(np.log([output_bias]))
model = keras.Sequential([keras.layers.Dense(nfirst, activation='relu', input_shape=(nfeatures,)),keras.layers.Dropout(dropout),
keras.layers.Dense(nhidden1, activation='relu'), keras.layers.Dropout(dropout)])
if nhidden2!=0:
model.add(keras.layers.Dense(nhidden2, activation='relu'))
model.add(keras.layers.Dropout(dropout))
model.add(keras.layers.Dense(1, activation='sigmoid',bias_initializer=output_bias))

model.compile(optimizer=keras.optimizers.Adam(lr=lr),loss=keras.losses.BinaryCrossentropy(),metrics= ['loss', 'auc', 'precision', 'recall'])
return model

Now we will create custom sklearn classifier based on keras model, which will support GridSearch with different number of neurons for hidden layers.
To avoid potential memory leaks we also create custom destructor.

from sklearn.model_selection import cross_validate
from sklearn.base import BaseEstimator,ClassifierMixin

class MyNN(BaseEstimator, ClassifierMixin):
    def __init__(self, lr=0.005,nfirst=1,nhidden1=10,nhidden2=0,dropout=0,output_bias=1,batch_size=100,epochs=10,scale_pos_weight=1):

        self.lr=lr
        self.epochs=epochs
        self.batch_size=batch_size
        self.nfirst=nfirst
        self.nhidden1=nhidden1
        self.nhidden2=nhidden2
        self.dropout=dropout
        self.output_bias=output_bias
        self.scale_pos_weight=scale_pos_weight

    def fit(self, X, y,**fit_params): ##{'nhidden': 40,'nfirst': 20,'epochs': 1,'drop2': 0.2,'drop1': 0.2,'batch_size': 100}

        try:
            if X.isnull().values.any() or y.isnull().values.any():
                print("X or y contain nans")
        except:
            pass
        self.classes_ = unique_labels(y)
        if self.scale_pos_weight is not None:
            fit_params['class_weight']={0:1,1:self.scale_pos_weight}
        self.model = KerasClassifier(build_model2,**{'nfeatures':X.shape[-1],'lr':self.lr,'nhidden1': self.nhidden1,'nhidden2': self.nhidden2,'nfirst': self.nfirst,\
                                                     'epochs': self.epochs,'dropout': self.dropout, 'batch_size': self.batch_size,'output_bias':self.output_bias},verbose=0)
        
        fit_paramsnoevalset=fit_params.copy()
        for k in ['eval_metric','eval_set']:#,entriesToRemove:
           fit_paramsnoevalset.pop(k, None)

        if fit_params.get('eval_set') is None: 
            self.history=self.model.fit(X,y,**fit_paramsnoevalset)

        else:
            self.history=self.model.fit(X,y,validation_data=(fit_params['eval_set'][0][0], fit_params['eval_set'][0][1]),**fit_paramsnoevalset) 
            if fit_params['eval_metric'] not in [m._name for m in METRICS]:
                try:
                    scorer=SCORERS[fit_params['eval_metric']]._score_func
                except:#like minusf1 
                    scorer=fit_params['eval_metric']
                self.score=scorer(fit_params['eval_set'][0][1],self.model.predict(fit_params['eval_set'][0][0]))    
            else:
                self.score=self.history.history['val_'+fit_params['eval_metric']]
        return self.model 
         
    def evals_result(self):
          return {'validation_0':self.score}
          
    def predict(self, X):
        return self.model.predict(X)
    
    def predict_proba(self, X):
        return self.model.predict_proba(X)
    
    def __del__(self):
          tf.keras.backend.clear_session()
          gc.collect()
          if hasattr(self, 'model'):
            del self.model

And now we can use MyNN model in GridSearchCV or RandomizedSearch like this (after specifying cross validation iterator via cv, and dftrain as training set, and setting scale_pos_weight for classification problem where only 10% of classes is positive):

randcv = RandomizedSearchCV(estimator=MyNN(lr=0.005,nfirst=10,nhidden1=10,nhidden2=0,dropout=0.2,output_bias=0.1,batch_size=100,epochs=1),\
                            param_distributions=dict( epochs=[ 50,100,200], batch_size=[ 10,100],nhidden1=[2,5,10],nfirst=[10,20],dropout=[0.2],output_bias=[0.1,0.9],scale_pos_weight=[1,10]),\
                            n_iter=30, scoring='f1', n_jobs=1, cv=cv, verbose=1).fit(dftrain[xs], dftrain['y'])

pd.DataFrame(randcv.cv_results_).sort_values(by='mean_test_score',ascending=False)

https://www.tensorflow.org/api_docs/python/tf/keras/wrappers/scikit_learn/KerasClassifier

Posted in machine learning Tagged with: ,