Source code for skwdro.linear_models._logistic_regression

"""
Logistic Regression
"""
from typing import Optional
import numpy as np
import torch as pt
import torch.nn as nn
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.exceptions import DataConversionWarning
from scipy.special import expit


from skwdro.base.problems import EmpiricalDistributionWithLabels
from skwdro.base.losses_torch.logistic import BiDiffSoftMarginLoss
from skwdro.solvers.optim_cond import OptCondTorch
from skwdro.base.cost_decoder import cost_from_str

import skwdro.solvers.specific_solvers as spS
import skwdro.solvers.entropic_dual_torch as entTorch
from skwdro.wrap_problem import dualize_primal_loss


[docs] class LogisticRegression(BaseEstimator, ClassifierMixin): r""" A Wasserstein Distributionally Robust logistic regression classifier. The cost function is XXX Uncertainty is XXX Parameters ---------- rho: float, default=1e-2 Robustness radius l2_reg: float, default=None l2 regularization fit_intercept: boolean, default=True Determines if an intercept is fit or not cost: str, default="n-NC-1-2" Tiret-separated code to define the transport cost: "<engine>-<cost id>-<k-norm type>-<power>" for :math:`c(x, y):=\|x-y\|_k^p` solver: str, default='entropic_torch' Solver to be used: 'entropic', 'entropic_torch' (_pre or _post) or 'dedicated' solver_reg: float, default=1e-2 regularization value for the entropic solver n_zeta_samples: int, default=10 number of adversarial samples to draw opt_cond: Optional[OptCondTorch] optimality condition, see :py:class:`OptCondTorch` Attributes ---------- coef_ : array, shape (n_features,) parameter vector (:math:`w` in the cost function formula) intercept_ : float constant term in decision function. Examples -------- >>> import numpy as np >>> from skwdro.linear_models import LogisticRegression >>> from sklearn.datasets import make_blobs >>> from sklearn.model_selection import train_test_split >>> X, y = make_blobs(n_samples=100, centers=2, n_features=2, random_state=0) >>> y = np.sign(y-0.5) >>> X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42) >>> estimator = LogisticRegression() >>> estimator.fit(X_train,y_train) LogisticRegression() >>> estimator.predict(X_test) array([-1., -1., -1., 1., -1., 1., 1., -1., -1., 1., 1., 1., -1., 1., 1., 1., 1., 1., -1., -1., -1., 1., 1., -1., -1., 1., -1., 1., 1., 1., 1., 1., -1.]) >>> estimator.score(X_test,y_test) 0.9393939393939394 """ def __init__(self, rho: float = 1e-2, l2_reg: float = 0., fit_intercept: bool = True, cost: str = "t-NLC-2-2", solver="entropic_torch", solver_reg: Optional[float] = None, sampler_reg: Optional[float] = None, n_zeta_samples: int = 10, random_state: int = 0, opt_cond: Optional[OptCondTorch] = OptCondTorch(2, 1e-4, 0.) ): if rho < 0: raise ValueError( f"The uncertainty radius rho should be non-negative, received {rho}") self.rho = rho self.l2_reg = l2_reg self.cost = cost self.fit_intercept = fit_intercept self.solver = solver self.solver_reg = solver_reg self.sampler_reg = sampler_reg # sigma self.opt_cond = opt_cond self.n_zeta_samples = n_zeta_samples self.random_state = random_state
[docs] def fit(self, X, y): """Fits the WDRO classifier. Parameters ---------- X : array-like, shape (n_samples, n_features) The training input samples. y : array-like, shape (n_samples,) The target values. An array of int. Only -1 or +1 are currently supported Returns ------- self : LogisticRegression Returns self. """ # Check that X and y have correct shape X, y = check_X_y(X, y) X = np.array(X) y = np.array(y) # Type checking for rho if self.rho is not float: try: self.rho = float(self.rho) except BaseException: raise TypeError( f"The uncertainty radius rho should be numeric, received {type(self.rho)}") if len(y.shape) != 1: y = y.ravel() raise DataConversionWarning( f"Given y is {y.shape}, while expected shape is (n_sample,)") # Store the classes seen during fit self.classes_ = unique_labels(y) # Encode the labels if len(self.classes_) == 2: # Binary classification self.le_ = LabelEncoder() y = self.le_.fit_transform(y) if y is None: raise ValueError("Problem with labels, none out of label encoder") else: y = np.array(y, dtype=X.dtype) y[y == 0.] = -1. elif len(self.classes_) > 2: # Multiclass classification self.le_ = OneHotEncoder(sparse_output=False) y = self.le_.fit_transform(y[:, None]) if y is None: raise ValueError("Problem with labels, none out of label encoder") else: y = np.array(y, dtype=X.dtype) else: raise ValueError(f"Found {len(self.classes_)} classes, while 2 are expected.") # Check type if not np.issubdtype(X.dtype, np.number): raise ValueError(f"Input X has dtype {X.dtype}") # Store data self.X_ = X self.y_ = y # Setup problem parameters ################ m, d = np.shape(X) self.n_features_in_ = d samples_y = y[:, None] if len(self.classes_) == 2 else y emp = EmpiricalDistributionWithLabels(m=m, samples_x=X, samples_y=samples_y) # Define cleanly the hyperparameters of the problem. self.cost_ = cost_from_str(self.cost) # ######################################### if self.solver == "entropic": raise (DeprecationWarning( "The entropic (numpy) solver is now deprecated")) elif self.solver == "dedicated": if len(self.classes_) > 2: raise NotImplementedError( f"Multiclass classification is not implemented for dedicated solver. ({len(self.classes_)} classes were found : {self.classes_})") # The logistic regression has a dedicated MP problem-description (solved using cvxopt) # One may use it by specifying this option self.coef_, self.intercept_, self.dual_var_, self.result_ = spS.WDROLogisticSpecificSolver( rho=self.rho, kappa=1000, X=X, y=y, fit_intercept=self.fit_intercept ) elif "torch" in self.solver: _post_sample = self.solver == "entropic_torch" or self.solver == "entropic_torch_post" module_out = 1 if len(self.classes_) == 2 else len(self.classes_) loss = BiDiffSoftMarginLoss(reduction='none') if len(self.classes_) == 2 else nn.CrossEntropyLoss(reduction='none') self.wdro_loss_ = dualize_primal_loss( loss, nn.Linear(self.n_features_in_, module_out, bias=self.fit_intercept), pt.tensor(self.rho), pt.Tensor(emp.samples_x), pt.Tensor(emp.samples_y), _post_sample, self.cost, self.n_zeta_samples, self.random_state, sigma=self.sampler_reg, epsilon=self.solver_reg, imp_samp=_post_sample, # hard set adapt="prodigy", l2reg=self.l2_reg ) # The problem is solved with the new "dual loss" self.coef_, self.intercept_, self.dual_var_, self.robust_loss_ = entTorch.solve_dual_wdro( self.wdro_loss_, emp, self.opt_cond # type: ignore ) self.coef_ = self.wdro_loss_.primal_loss.transform.weight.detach().cpu().squeeze().numpy() if self.fit_intercept: self.intercept_ = self.wdro_loss_.primal_loss.transform.bias.detach().cpu().squeeze().numpy() else: self.intercept_ = None # # TODO: deprecate ? # # Stock the robust loss result # Problems w/ dtypes (f32->f64 for some reason) # To be fixed later # ============================================= # Stock the robust loss result # if self.solver == "entropic_torch_pre": # #self.result_ = _wdro_loss.forward(xi=self.X_, xi_labels=self.y_, zeta=?, zeta_labels=?) # #raise NotImplementedError("Result for pre_sample not available") # pass # elif self.solver == "entropic_torch" or self.solver == "entropic_torch_post": # self.result_ = _wdro_loss.forward(xi=pt.from_numpy(emp.samples_x), xi_labels=pt.from_numpy(emp.samples_y)).item() else: raise NotImplementedError() self.is_fitted_ = True # Return the classifier return self
[docs] def predict_proba_2Class(self, X): """ Robust prediction probability for class +1. Parameters ---------- X : array-like, shape (n_samples, n_features) The input samples. Returns ------- p : ndarray, shape (n_samples,) The probability of class +1 for each of the samples. """ # Check is fit had been called check_is_fitted(self, ['X_', 'y_']) # Input validation X = check_array(X) if self.intercept_ is not None: p = expit(X.dot(self.coef_) + self.intercept_) else: p = expit(X.dot(self.coef_)) # p = 1 / (1 + np.exp(-(X.dot(self.coef_)+self.intercept_))) return p
[docs] def predict_proba(self, X): """ Robust prediction probability for classes -1 and +1. Parameters ---------- X : array-like, shape (n_samples, n_features) The input samples. Returns ------- p : ndarray, shape (n_samples,2) The probability of each class for each of the samples. """ # Check is fit had been called check_is_fitted(self, ['X_', 'y_']) # Input validation X = check_array(X) p = expit(X.dot(self.coef_) + self.intercept_) return np.vstack((1 - p, p)).T
[docs] def predict(self, X): """ Robust prediction. Parameters ---------- X : array-like, shape (n_samples, n_features) The input samples. Returns ------- y : ndarray, shape (n_samples,) The label for each sample is the label of the closest sample seen during fit. """ # Check is fit had been called check_is_fitted(self, ['X_', 'y_']) # Input validation X = check_array(X) X = np.array(X) proba = self.predict_proba_2Class(X) out = self.le_.inverse_transform((proba >= 0.5).astype('uint8')) return out
def _more_tags(self): return {'poor_score': True, # In order to pass with any rho... 'binary_only': True, # Only binary classification # 'non_deterministic': True # For stochastic methods }