Source code for skwdro.linear_models._logistic_regression

"""
Logistic Regression
"""
from typing import Optional
import numpy as np
import torch as pt
import torch.nn as nn
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.exceptions import DataConversionWarning
from scipy.special import expit


from skwdro.base.problems import EmpiricalDistributionWithLabels
from skwdro.base.losses_torch.logistic import BiDiffSoftMarginLoss
from skwdro.solvers.optim_cond import OptCondTorch
from skwdro.base.cost_decoder import cost_from_str

import skwdro.solvers.specific_solvers as spS
import skwdro.solvers.entropic_dual_torch as entTorch
from skwdro.solvers.utils import Steps
from skwdro.wrap_problem import dualize_primal_loss


[docs] class LogisticRegression(BaseEstimator, ClassifierMixin): r""" A Wasserstein Distributionally Robust logistic regression classifier. The cost function is .. math:: \ell(\theta,\xi=(x,y)) = \log(1+e^{-y\langle \theta,x \rangle}) The WDRO problem solved at fitting is .. math:: \min_{\theta} \max_{\mathbb{Q} | W(\mathbb{P}^n,\mathbb{Q}) \le\rho} \mathbb{E}_{\zeta\sim\mathbb{Q}} \ell(\theta,\zeta=(x,y)) Parameters ---------- rho: float, default=1e-2 Robustness radius l2_reg: float, default=None l2 regularization fit_intercept: boolean, default=True Determines if an intercept is fit or not cost: str, default="n-NC-1-2" Tiret-separated code to define the transport cost: "<engine>-<cost id>-<k-norm type>-<power>" for :math:`c(x, y):=\|x-y\|_k^p` solver: str, default='entropic_torch' Solver to be used: 'entropic', 'entropic_torch' (_pre or _post) or 'dedicated' solver_reg: float | None, default=None regularization value for the entropic solver, has a default heuristic sampler_reg: float | None, default=None standard deviation of the regularization distribution :math:`\pi_0`, has a default heuristic learning_rate: float | None, default=None if not set, use a default value depending on the problem, else specifies the stepsize of the gradient descent algorithm n_zeta_samples: int, default=10 number of adversarial samples to draw opt_cond: Optional[OptCondTorch] optimality condition, see :py:class:`OptCondTorch` Attributes ---------- coef_ : array, shape (n_features,) parameter vector (:math:`w` in the cost function formula) intercept_ : float constant term in decision function. Examples -------- >>> import numpy as np >>> from skwdro.linear_models import LogisticRegression >>> from sklearn.datasets import make_blobs >>> from sklearn.model_selection import train_test_split >>> X, y = make_blobs(n_samples=100, centers=2, n_features=2, random_state=0) >>> y = np.sign(y-0.5) >>> X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42) >>> estimator = LogisticRegression() >>> estimator.fit(X_train,y_train) LogisticRegression() >>> estimator.predict(X_test) array([-1., -1., -1., 1., -1., 1., 1., -1., -1., 1., 1., 1., -1., 1., 1., 1., 1., 1., -1., -1., -1., 1., 1., -1., -1., 1., -1., 1., 1., 1., 1., 1., -1.]) >>> estimator.score(X_test,y_test) 0.9393939393939394 """ DEFAULT_OCOND = OptCondTorch(2, 1e-4, 0.) def __init__(self, rho: float = 1e-2, l2_reg: float = 0., fit_intercept: bool = True, cost: str = "t-NLC-2-2", solver="entropic_torch", solver_reg: Optional[float] = None, sampler_reg: Optional[float] = None, learning_rate: Optional[float] = None, n_zeta_samples: int = 10, random_state: int = 0, n_iter: Optional[Steps] = None, opt_cond: Optional[OptCondTorch] = DEFAULT_OCOND ): if rho < 0: raise ValueError( ' '.join([ "The uncertainty radius rho should be", "non-negative, received", f"{rho}" ]) ) self.rho = rho self.l2_reg = l2_reg self.cost = cost self.fit_intercept = fit_intercept self.solver = solver self.solver_reg = solver_reg self.sampler_reg = sampler_reg # sigma self.learning_rate = learning_rate self.n_iter = n_iter self.opt_cond = opt_cond self.n_zeta_samples = n_zeta_samples self.random_state = random_state
[docs] def fit(self, X, y): """Fits the WDRO classifier. Parameters ---------- X : array-like, shape (n_samples, n_features) The training input samples. y : array-like, shape (n_samples,) The target values. An array of int. Only -1 or +1 are currently supported Returns ------- self : LogisticRegression Returns self. """ # Check that X and y have correct shape X, y = check_X_y(X, y) X = np.array(X) y = np.array(y) # Type checking for rho if not isinstance(self.rho, float): try: self.rho = float(self.rho) except BaseException: raise TypeError( ' '.join([ "The uncertainty radius rho should be", f"numeric, received {type(self.rho)}" ]) ) if len(y.shape) != 1: y = y.ravel() raise DataConversionWarning( f"Given y is {y.shape}, while expected shape is (n_sample,)") # Store the classes seen during fit self.classes_ = unique_labels(y) # Encode the labels if len(self.classes_) == 2: # Binary classification self.le_ = LabelEncoder() y = self.le_.fit_transform(y) if y is None: raise ValueError( "Problem with labels, none out of label encoder" ) else: y = np.array(y, dtype=X.dtype) y[y == 0.] = -1. elif len(self.classes_) > 2: # Multiclass classification self.le_ = OneHotEncoder(sparse_output=False) y = self.le_.fit_transform(y[:, None]) if y is None: raise ValueError( "Problem with labels, none out of label encoder" ) else: y = np.array(y, dtype=X.dtype) else: raise ValueError( ' '.join([ f"Found {len(self.classes_)} classes,", "while (at least) 2 are expected." ]) ) # Check type # if not np.issubdtype(X.dtype, np.number): # raise ValueError(f"Input X has dtype {X.dtype}") # Store data self.X_ = X self.y_ = y # Setup problem parameters ################ m, d = np.shape(X) self.n_features_in_ = d samples_y = y[:, None] if len(self.classes_) == 2 else y emp = EmpiricalDistributionWithLabels( m=m, samples_x=X, samples_y=samples_y ) # Define cleanly the hyperparameters of the problem. self.cost_ = cost_from_str(self.cost) # ######################################### if self.solver == "entropic": raise DeprecationWarning( "The entropic (numpy) solver is now deprecated" ) elif self.solver == "dedicated": if len(self.classes_) > 2: raise NotImplementedError( ' '.join([ "Multiclass classification is not implemented", "for dedicated solver.", f"({len(self.classes_)} classes were found:", f"{self.classes_})" ]) ) # The logistic regression has a dedicated MP problem-description # (solved using cvxopt) # One may use it by specifying this option ( self.coef_, self.intercept_, self.dual_var_, self.result_ ) = spS.WDROLogisticSpecificSolver( rho=self.rho, kappa=1000, X=X, y=y, fit_intercept=self.fit_intercept ) elif "torch" in self.solver: _post_sample = ( self.solver in ("entropic_torch", "entropic_torch_post") ) if self.opt_cond is None: self.opt_cond = OptCondTorch(2) _bilat = len(self.classes_) == 2 module_out = 1 if _bilat else len(self.classes_) loss = ( BiDiffSoftMarginLoss if _bilat else nn.CrossEntropyLoss )(reduction='none') self.wdro_loss_ = dualize_primal_loss( loss, nn.Linear( self.n_features_in_, module_out, bias=self.fit_intercept ), pt.tensor(self.rho), pt.Tensor(emp.samples_x), pt.Tensor(emp.samples_y), _post_sample, self.cost, self.n_zeta_samples, self.random_state, learning_rate=self.learning_rate, n_iter=self.n_iter, sigma=self.sampler_reg, epsilon=self.solver_reg, imp_samp=_post_sample, # hard set adapt="prodigy" if self.learning_rate is None else None, l2reg=self.l2_reg, # Important: if using multiclass, tell the interfaces that the # loss batch will not have a trailing dimension to reduce loss_reduces_spatial_dims=not _bilat ) # The problem is solved with the new "dual loss" self.coef_, self.intercept_, self.dual_var_, self.robust_loss_ = entTorch.solve_dual_wdro( self.wdro_loss_, emp, self.opt_cond # type: ignore ) assert isinstance(self.wdro_loss_.primal_loss.transform, nn.Linear) self.coef_ = ( self .wdro_loss_ .primal_loss .transform .weight .detach() .cpu() .squeeze() .numpy() ) if self.fit_intercept: self.intercept_ = ( self .wdro_loss_ .primal_loss .transform .bias.detach() .cpu() .squeeze() .numpy() ) else: self.intercept_ = None # Unknown solver else: raise NotImplementedError("Designation for solver not recognized") self.is_fitted_ = True # Return the classifier return self
[docs] def predict_proba_2Class(self, X): """ Robust prediction probability for class +1. Parameters ---------- X : array-like, shape (n_samples, n_features) The input samples. Returns ------- p : ndarray, shape (n_samples,) The probability of class +1 for each of the samples. """ # Check is fit had been called check_is_fitted(self, ['X_', 'y_']) # Input validation X = check_array(X) if self.intercept_ is not None: p = expit(X.dot(self.coef_) + self.intercept_) else: p = expit(X.dot(self.coef_)) # p = 1 / (1 + np.exp(-(X.dot(self.coef_)+self.intercept_))) return p
[docs] def predict_proba(self, X): """ Robust prediction probability for classes -1 and +1. Parameters ---------- X : array-like, shape (n_samples, n_features) The input samples. Returns ------- p : ndarray, shape (n_samples,2) The probability of each class for each of the samples. """ # Check is fit had been called check_is_fitted(self, ['X_', 'y_']) # Input validation X = check_array(X) if not hasattr(self, "wdro_loss_") or len(self.classes_) <= 2: p = expit(X.dot(self.coef_) + self.intercept_) return np.vstack((1 - p, p)).T else: return self.wdro_loss_.primal_loss.transform( pt.from_numpy(X).to(self.wdro_loss_.rho) ).detach().cpu().numpy()
[docs] def predict(self, X): """ Robust prediction. Parameters ---------- X : array-like, shape (n_samples, n_features) The input samples. Returns ------- y : ndarray, shape (n_samples,) The label for each sample is the label of the closest sample seen during fit. """ # Check is fit had been called check_is_fitted(self, ['X_', 'y_']) # Input validation X = check_array(X) X = np.array(X) if isinstance(self.le_, LabelEncoder): proba = self.predict_proba_2Class(X) out = self.le_.inverse_transform((proba >= 0.5).astype('uint8')) elif isinstance(self.le_, OneHotEncoder): proba = self.predict_proba(X) _p = proba >= np.max(proba, axis=1, keepdims=True) out = self.le_.inverse_transform( _p.astype('uint8') ) else: raise ValueError( "The label encoder type {type(self.le_)} is not supported." ) return out
def _more_tags(self): return {'poor_score': True, # In order to pass with any rho... 'binary_only': True, # Only binary classification # 'non_deterministic': True # For stochastic methods }