Source code for pysarpu.propensity

import numpy as np
import scipy.stats as scs
import statsmodels.api as sm
from .likelihood_models import LogProbitSig, Gumbel

class Propensity:
    '''
    Instance of propensity model to be used in a PU classification model
    '''
    def __init__(self):
        self.params = None
    
    def __str__(self):
        return str(self.params)
    
    def __repr__(self):
        return self.__str__()
    
    def initialization(self, Xe):
        self.params = np.random.rand(Xe.shape[1] + 1)
    
    def e(self, Xe):
        return 0.5*np.ones(Xe.shape[0])
    
    def loge(self, Xe):
        return np.zeros(Xe.shape[0])
    
    def predict_proba(self, Xe):
        return self.e(Xe)

    def predict_logproba(self, Xe):
        return self.loge(Xe)
    
    def predict(self, Xe, threshold=0.5):
        return (self.predict_proba(Xe)>=threshold).astype(int)

[docs]class LogProbitPropensity(Propensity): def __init__(self): return super(LogProbitPropensity, self).__init__() def __str__(self): if self.params is not None: s = 'Phi = {}'.format(self.params[:-1]) s += '\n' s += 'sigma = {}'.format(self.params[-1]) return s else: return '' def __repr__(self): return self.__str__() def initialization(self, Xe, w=1.): self.params = np.random.rand(Xe.shape[1] + 1) def e(self, Xe): if self.params is None: print('Please initialise parameters first') return pred = np.max(np.concatenate([np.dot(Xe, self.params[:-1])[:,np.newaxis], 1e-30*np.zeros((Xe.shape[0],1))], axis=1), axis=1) return scs.norm.cdf(np.log(pred), scale=self.params[-1]) def loge(self, Xe): if self.params is None: print('Please initialise parameters first') return pred = np.max(np.concatenate([np.dot(Xe, self.params[:-1])[:,np.newaxis], 1e-30*np.zeros((Xe.shape[0],1))], axis=1), axis=1) return np.max(np.concatenate([scs.norm.logcdf(np.log(pred), scale=self.params[-1])[:,np.newaxis], -1e99*np.ones((Xe.shape[0],1))], axis=1), axis=1) def log1e(self, Xe): if self.params is None: print('Please initialise parameters first') return pred = np.max(np.concatenate([np.dot(Xe, self.params[:-1])[:,np.newaxis], 1e-30*np.zeros((Xe.shape[0],1))], axis=1), axis=1) return scs.norm.logsf(np.log(pred), scale=self.params[-1]) def fit(self, Xe, gamma, Y, w=1., warm_start=True, balance=False): if self.params is None: self.initialization(Xe, gamma, Y, w) if warm_start == False: start_params = None else: start_params = np.log(self.params.copy()) if balance: q1, q2 = sum(Y)/sum(gamma), 1-sum(Y)/sum(gamma) c1, c2 = 1/q1/(1/q1 + 1/q2), 1/q2/(1/q1 + 1/q2) weights = c1*Y + c2*(1-Y) else: weights = 1 self.model = LogProbitSig(Y, Xe, weights*gamma) self.res = self.model.fit(start_params=start_params, disp=0) self.params = np.exp(self.res.params.copy())
[docs]class LogisticPropensity(Propensity): ''' Logistic Propensity : the feature vector Xe is assumed to contain an intercept as its first column ''' def __init__(self): return super(LogisticPropensity, self).__init__() def __str__(self): if self.params is not None: s = 'phi = {}'.format(self.params) return s else: return '' def __repr__(self): return self.__str__() def initialization(self, Xe, w=1.): self.params = np.random.randn(Xe.shape[1]+1) def e(self, Xe): # add intercept # Xe = np.concatenate([np.ones((Xe.shape[0],1)), Xe], axis=1) Xe = np.concatenate([np.ones(Xe.shape[:-1]+(1,)), Xe], axis=-1) if self.params is None: print('Please initialise parameters first') return pred = np.dot(Xe, self.params) return scs.logistic.cdf(pred) def loge(self, Xe): # add intercept # Xe = np.concatenate([np.ones((Xe.shape[0],1)), Xe], axis=1) Xe = np.concatenate([np.ones(Xe.shape[:-1]+(1,)), Xe], axis=-1) if self.params is None: print('Please initialise parameters first') return pred = np.dot(Xe, self.params) return scs.logistic.logcdf(pred) def log1e(self, Xe): # # add intercept # Xe = np.concatenate([np.ones((Xe.shape[0],1)), Xe], axis=1) Xe = np.concatenate([np.ones(Xe.shape[:-1]+(1,)), Xe], axis=-1) if self.params is None: print('Please initialise parameters first') return pred = np.dot(Xe, self.params) return scs.logistic.logsf(pred) def fit(self, Xe, gamma, Y, w=1., warm_start=True, balance=False): if self.params is None: self.initialization(Xe, gamma, Y, w) if warm_start == False: start_params = None else: start_params = self.params.copy() if balance: q1, q2 = sum(Y)/sum(gamma), 1-sum(Y)/sum(gamma) c1, c2 = 1/q1/(1/q1 + 1/q2), 1/q2/(1/q1 + 1/q2) weights = c1*Y + c2*(1-Y) else: weights = 1 # add intercept # Xe = np.concatenate([np.ones((Xe.shape[0],1)), Xe], axis=1) Xe = np.concatenate([np.ones(Xe.shape[:-1]+(1,)), Xe], axis=-1) self.model = sm.GLM(Y, Xe, freq_weights=weights*gamma, family=sm.families.Binomial()) self.res = self.model.fit(start_params=start_params, disp=0) self.params = self.res.params.copy()
[docs]class GumbelPropensity(Propensity): def __init__(self): return super(GumbelPropensity, self).__init__() def __str__(self): if self.params is not None: s = 'Phi = {}'.format(self.params[:-1]) s += '\n' s += 'k = {}'.format(self.params[-1]) return s else: return '' def __repr__(self): return self.__str__() def initialization(self, Xe, w=1.): self.params = np.random.rand(Xe.shape[1] + 1) def e(self, Xe): if self.params is None: print('Please initialise parameters first') return pred = np.max(np.concatenate([np.dot(Xe, self.params[:-1])[:,np.newaxis], 1e-30*np.zeros((Xe.shape[0],1))], axis=1), axis=1) return scs.weibull_min.cdf(pred, self.params[-1]) def loge(self, Xe): if self.params is None: print('Please initialise parameters first') return pred = np.max(np.concatenate([np.dot(Xe, self.params[:-1])[:,np.newaxis], 1e-30*np.zeros((Xe.shape[0],1))], axis=1), axis=1) return np.max(np.concatenate([scs.weibull_min.logcdf(pred, self.params[-1])[:,np.newaxis], -1e99*np.ones((Xe.shape[0],1))], axis=1), axis=1) def log1e(self, Xe): if self.params is None: print('Please initialise parameters first') return pred = np.max(np.concatenate([np.dot(Xe, self.params[:-1])[:,np.newaxis], 1e-30*np.zeros((Xe.shape[0],1))], axis=1), axis=1) return scs.weibull_min.logsf(pred, self.params[-1]) def fit(self, Xe, gamma, Y, w=1., warm_start=True, balance=False): if self.params is None: self.initialization(Xe, gamma, Y, w) if warm_start == False: start_params = None else: start_params = np.log(self.params.copy()) if balance: q1, q2 = sum(Y)/sum(gamma), 1-sum(Y)/sum(gamma) c1, c2 = 1/q1/(1/q1 + 1/q2), 1/q2/(1/q1 + 1/q2) weights = c1*Y + c2*(1-Y) else: weights = 1 self.model = Gumbel(Y, Xe, weights*gamma) self.res = self.model.fit(start_params=start_params, disp=0) # print(self.res.summary()) self.params = np.exp(self.res.params.copy())