"""
Logistic Regression
"""
from typing import Optional
import numpy as np
import torch as pt
import torch.nn as nn
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.exceptions import DataConversionWarning
from scipy.special import expit
from skwdro.base.problems import EmpiricalDistributionWithLabels
from skwdro.base.losses_torch.logistic import BiDiffSoftMarginLoss
from skwdro.solvers.optim_cond import OptCondTorch
from skwdro.base.cost_decoder import cost_from_str
import skwdro.solvers.specific_solvers as spS
import skwdro.solvers.entropic_dual_torch as entTorch
from skwdro.solvers.utils import Steps
from skwdro.wrap_problem import dualize_primal_loss
[docs]
class LogisticRegression(BaseEstimator, ClassifierMixin):
r""" A Wasserstein Distributionally Robust logistic regression classifier.
The cost function is
.. math::
\ell(\theta,\xi=(x,y)) = \log(1+e^{-y\langle \theta,x \rangle})
The WDRO problem solved at fitting is
.. math::
\min_{\theta} \max_{\mathbb{Q} | W(\mathbb{P}^n,\mathbb{Q}) \le\rho} \mathbb{E}_{\zeta\sim\mathbb{Q}} \ell(\theta,\zeta=(x,y))
Parameters
----------
rho: float, default=1e-2
Robustness radius
l2_reg: float, default=None
l2 regularization
fit_intercept: boolean, default=True
Determines if an intercept is fit or not
cost: str, default="n-NC-1-2"
Tiret-separated code to define the transport cost:
"<engine>-<cost id>-<k-norm type>-<power>"
for :math:`c(x, y):=\|x-y\|_k^p`
solver: str, default='entropic_torch'
Solver to be used: 'entropic', 'entropic_torch'
(_pre or _post) or 'dedicated'
solver_reg: float | None, default=None
regularization value for the entropic solver, has
a default heuristic
sampler_reg: float | None, default=None
standard deviation of the regularization distribution :math:`\pi_0`, has
a default heuristic
learning_rate: float | None, default=None
if not set, use a default value depending on the problem, else
specifies the stepsize of the gradient descent algorithm
n_zeta_samples: int, default=10
number of adversarial samples to draw
opt_cond: Optional[OptCondTorch]
optimality condition, see :py:class:`OptCondTorch`
Attributes
----------
coef_ : array, shape (n_features,)
parameter vector (:math:`w` in the cost function formula)
intercept_ : float
constant term in decision function.
Examples
--------
>>> import numpy as np
>>> from skwdro.linear_models import LogisticRegression
>>> from sklearn.datasets import make_blobs
>>> from sklearn.model_selection import train_test_split
>>> X, y = make_blobs(n_samples=100, centers=2, n_features=2, random_state=0)
>>> y = np.sign(y-0.5)
>>> X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
>>> estimator = LogisticRegression()
>>> estimator.fit(X_train,y_train)
LogisticRegression()
>>> estimator.predict(X_test)
array([-1., -1., -1., 1., -1., 1., 1., -1., -1., 1., 1., 1., -1.,
1., 1., 1., 1., 1., -1., -1., -1., 1., 1., -1., -1., 1.,
-1., 1., 1., 1., 1., 1., -1.])
>>> estimator.score(X_test,y_test)
0.9393939393939394
"""
DEFAULT_OCOND = OptCondTorch(2, 1e-4, 0.)
def __init__(self,
rho: float = 1e-2,
l2_reg: float = 0.,
fit_intercept: bool = True,
cost: str = "t-NLC-2-2",
solver="entropic_torch",
solver_reg: Optional[float] = None,
sampler_reg: Optional[float] = None,
learning_rate: Optional[float] = None,
n_zeta_samples: int = 10,
random_state: int = 0,
n_iter: Optional[Steps] = None,
opt_cond: Optional[OptCondTorch] = DEFAULT_OCOND
):
if rho < 0:
raise ValueError(
' '.join([
"The uncertainty radius rho should be",
"non-negative, received",
f"{rho}"
])
)
self.rho = rho
self.l2_reg = l2_reg
self.cost = cost
self.fit_intercept = fit_intercept
self.solver = solver
self.solver_reg = solver_reg
self.sampler_reg = sampler_reg # sigma
self.learning_rate = learning_rate
self.n_iter = n_iter
self.opt_cond = opt_cond
self.n_zeta_samples = n_zeta_samples
self.random_state = random_state
[docs]
def fit(self, X, y):
"""Fits the WDRO classifier.
Parameters
----------
X : array-like, shape (n_samples, n_features)
The training input samples.
y : array-like, shape (n_samples,)
The target values. An array of int. Only -1 or +1 are currently
supported
Returns
-------
self : LogisticRegression
Returns self.
"""
# Check that X and y have correct shape
X, y = check_X_y(X, y)
X = np.array(X)
y = np.array(y)
# Type checking for rho
if not isinstance(self.rho, float):
try:
self.rho = float(self.rho)
except BaseException:
raise TypeError(
' '.join([
"The uncertainty radius rho should be",
f"numeric, received {type(self.rho)}"
])
)
if len(y.shape) != 1:
y = y.ravel()
raise DataConversionWarning(
f"Given y is {y.shape}, while expected shape is (n_sample,)")
# Store the classes seen during fit
self.classes_ = unique_labels(y)
# Encode the labels
if len(self.classes_) == 2:
# Binary classification
self.le_ = LabelEncoder()
y = self.le_.fit_transform(y)
if y is None:
raise ValueError(
"Problem with labels, none out of label encoder"
)
else:
y = np.array(y, dtype=X.dtype)
y[y == 0.] = -1.
elif len(self.classes_) > 2:
# Multiclass classification
self.le_ = OneHotEncoder(sparse_output=False)
y = self.le_.fit_transform(y[:, None])
if y is None:
raise ValueError(
"Problem with labels, none out of label encoder"
)
else:
y = np.array(y, dtype=X.dtype)
else:
raise ValueError(
' '.join([
f"Found {len(self.classes_)} classes,",
"while (at least) 2 are expected."
])
)
# Check type
# if not np.issubdtype(X.dtype, np.number):
# raise ValueError(f"Input X has dtype {X.dtype}")
# Store data
self.X_ = X
self.y_ = y
# Setup problem parameters ################
m, d = np.shape(X)
self.n_features_in_ = d
samples_y = y[:, None] if len(self.classes_) == 2 else y
emp = EmpiricalDistributionWithLabels(
m=m,
samples_x=X,
samples_y=samples_y
)
# Define cleanly the hyperparameters of the problem.
self.cost_ = cost_from_str(self.cost)
# #########################################
if self.solver == "entropic":
raise DeprecationWarning(
"The entropic (numpy) solver is now deprecated"
)
elif self.solver == "dedicated":
if len(self.classes_) > 2:
raise NotImplementedError(
' '.join([
"Multiclass classification is not implemented",
"for dedicated solver.",
f"({len(self.classes_)} classes were found:",
f"{self.classes_})"
])
)
# The logistic regression has a dedicated MP problem-description
# (solved using cvxopt)
# One may use it by specifying this option
(
self.coef_,
self.intercept_,
self.dual_var_,
self.result_
) = spS.WDROLogisticSpecificSolver(
rho=self.rho,
kappa=1000,
X=X,
y=y,
fit_intercept=self.fit_intercept
)
elif "torch" in self.solver:
_post_sample = (
self.solver in ("entropic_torch", "entropic_torch_post")
)
if self.opt_cond is None:
self.opt_cond = OptCondTorch(2)
_bilat = len(self.classes_) == 2
module_out = 1 if _bilat else len(self.classes_)
loss = (
BiDiffSoftMarginLoss if _bilat else nn.CrossEntropyLoss
)(reduction='none')
self.wdro_loss_ = dualize_primal_loss(
loss,
nn.Linear(
self.n_features_in_,
module_out,
bias=self.fit_intercept
),
pt.tensor(self.rho),
pt.Tensor(emp.samples_x),
pt.Tensor(emp.samples_y),
_post_sample,
self.cost,
self.n_zeta_samples,
self.random_state,
learning_rate=self.learning_rate,
n_iter=self.n_iter,
sigma=self.sampler_reg,
epsilon=self.solver_reg,
imp_samp=_post_sample, # hard set
adapt="prodigy" if self.learning_rate is None else None,
l2reg=self.l2_reg,
# Important: if using multiclass, tell the interfaces that the
# loss batch will not have a trailing dimension to reduce
loss_reduces_spatial_dims=not _bilat
)
# The problem is solved with the new "dual loss"
self.coef_, self.intercept_, self.dual_var_, self.robust_loss_ = entTorch.solve_dual_wdro(
self.wdro_loss_,
emp,
self.opt_cond # type: ignore
)
assert isinstance(self.wdro_loss_.primal_loss.transform, nn.Linear)
self.coef_ = (
self
.wdro_loss_
.primal_loss
.transform
.weight
.detach()
.cpu()
.squeeze()
.numpy()
)
if self.fit_intercept:
self.intercept_ = (
self
.wdro_loss_
.primal_loss
.transform
.bias.detach()
.cpu()
.squeeze()
.numpy()
)
else:
self.intercept_ = None
# Unknown solver
else:
raise NotImplementedError("Designation for solver not recognized")
self.is_fitted_ = True
# Return the classifier
return self
[docs]
def predict_proba_2Class(self, X):
""" Robust prediction probability for class +1.
Parameters
----------
X : array-like, shape (n_samples, n_features)
The input samples.
Returns
-------
p : ndarray, shape (n_samples,)
The probability of class +1 for each of the samples.
"""
# Check is fit had been called
check_is_fitted(self, ['X_', 'y_'])
# Input validation
X = check_array(X)
if self.intercept_ is not None:
p = expit(X.dot(self.coef_) + self.intercept_)
else:
p = expit(X.dot(self.coef_))
# p = 1 / (1 + np.exp(-(X.dot(self.coef_)+self.intercept_)))
return p
[docs]
def predict_proba(self, X):
""" Robust prediction probability for classes -1 and +1.
Parameters
----------
X : array-like, shape (n_samples, n_features)
The input samples.
Returns
-------
p : ndarray, shape (n_samples,2)
The probability of each class for each of the samples.
"""
# Check is fit had been called
check_is_fitted(self, ['X_', 'y_'])
# Input validation
X = check_array(X)
if not hasattr(self, "wdro_loss_") or len(self.classes_) <= 2:
p = expit(X.dot(self.coef_) + self.intercept_)
return np.vstack((1 - p, p)).T
else:
return self.wdro_loss_.primal_loss.transform(
pt.from_numpy(X).to(self.wdro_loss_.rho)
).detach().cpu().numpy()
[docs]
def predict(self, X):
""" Robust prediction.
Parameters
----------
X : array-like, shape (n_samples, n_features)
The input samples.
Returns
-------
y : ndarray, shape (n_samples,)
The label for each sample is the label of the closest sample
seen during fit.
"""
# Check is fit had been called
check_is_fitted(self, ['X_', 'y_'])
# Input validation
X = check_array(X)
X = np.array(X)
if isinstance(self.le_, LabelEncoder):
proba = self.predict_proba_2Class(X)
out = self.le_.inverse_transform((proba >= 0.5).astype('uint8'))
elif isinstance(self.le_, OneHotEncoder):
proba = self.predict_proba(X)
_p = proba >= np.max(proba, axis=1, keepdims=True)
out = self.le_.inverse_transform(
_p.astype('uint8')
)
else:
raise ValueError(
"The label encoder type {type(self.le_)} is not supported."
)
return out
def _more_tags(self):
return {'poor_score': True, # In order to pass with any rho...
'binary_only': True, # Only binary classification
# 'non_deterministic': True # For stochastic methods
}