skwdro.solvers package

Submodules

skwdro.solvers.entropic_dual_torch module

skwdro.solvers.entropic_dual_torch.deprecated(message)[source]
skwdro.solvers.entropic_dual_torch.extract_data(dist: Distribution)[source]

Get torch tensors out of empirical distribution.

Parameters:
dist: Distribution

Empirical distribution of data and optionally labels

Returns:
xi: pt.Tensor

data tensor

xi_labels: Optional[pt.Tensor]

label tensor if the distribution yields them, else None

skwdro.solvers.entropic_dual_torch.optim_postsample(optimizer: Optimizer, xi: Tensor, xi_labels: Tensor | None, loss: _DualLoss, opt_cond: OptCondTorch) List[float][source]

Optimize the dual loss by resampling the \zeta values at each gradient descent step.

Parameters:
n_iterint

number of gradient descent iterations to perform

optimizerpt.optim.Optimizer

loss-dependant optimizer, can be customized if needed

xipt.Tensor

data tensor

xi_labelsOptional[pt.Tensor]

target tensor

loss_DualLoss

dual loss instance

Returns:
List[float]
skwdro.solvers.entropic_dual_torch.optim_presample(optimizer: Optimizer, xi: Tensor, xi_labels: Tensor | None, loss: _DualLoss, opt_cond: OptCondTorch) List[float][source]

Optimize the dual loss by sampling the zeta values once at the begining of the optimization, then performing a deterministic gradient descent (e.g. BFGS style algorithm).

Parameters:
optimizerpt.optim.Optimizer

loss-dependant optimizer, can be customized if needed

xipt.Tensor

data tensor

xi_labelsOptional[pt.Tensor]

target tensor

loss_DualLoss

dual loss instance

Returns:
List[float]
skwdro.solvers.entropic_dual_torch.solve_dual_wdro(loss: _DualLoss, p_hat: Distribution, opt: OptCondTorch)[source]

Solve the dual problem with the loss-dependant grandient descent algorithm.

Parameters:
loss: _DualLoss

Dual loss

p_hat: Distribution

Empirical distribution

opt: OptCond

Optimality conditions

Returns:
theta: np.ndarray

Concatenated array of the parameters of the model, except the intercept if there is one

intercept: Optional[np.ndarray]

If the model has specificaly an intercept as one of its parameters, it is stacked in this output tensor

lambd: Union[np.ndarray, float]

Dual variable \lambda of the problem

skwdro.solvers.entropic_dual_torch_epsilon module

skwdro.solvers.entropic_dual_torch_epsilon.extract_data(dist: Distribution)[source]

Get torch tensors out of empirical distribution.

Parameters:
dist: Distribution

Empirical distribution of data and optionally labels

Returns:
xi: pt.Tensor

data tensor

xi_labels: Optional[pt.Tensor]

label tensor if the distribution yields them, else None

skwdro.solvers.entropic_dual_torch_epsilon.optim_postsample(optimizer: Optimizer, xi: Tensor, xi_labels: Tensor | None, loss: _DualLoss, opt_cond: OptCondTorch) List[Tensor][source]

Optimize the dual loss by resampling the \zeta values at each gradient descent step.

Parameters:
n_iterint

number of gradient descent iterations to perform

optimizerpt.optim.Optimizer

loss-dependant optimizer, can be customized if needed

xipt.Tensor

data tensor

xi_labelsOptional[pt.Tensor]

target tensor

loss_DualLoss

dual loss instance

Returns:
List[float]
skwdro.solvers.entropic_dual_torch_epsilon.optim_presample(optimizer: Optimizer, xi: Tensor, xi_labels: Tensor | None, loss: _DualLoss, opt_cond: OptCondTorch) List[float][source]

Optimize the dual loss by sampling the zeta values once at the begining of the optimization, the performing a deterministic gradient descent (e.g. BFGS style algorithm).

Parameters:
optimizerpt.optim.Optimizer

loss-dependant optimizer, can be customized if needed

xipt.Tensor

data tensor

xi_labelsOptional[pt.Tensor]

target tensor

loss_DualLoss

dual loss instance

Returns:
List[float]
skwdro.solvers.entropic_dual_torch_epsilon.solve_dual_wdro(loss: _DualLoss, p_hat: Distribution, opt: OptCondTorch)[source]

Solve the dual problem with the loss-dependant grandient descent algorithm.

Parameters:
loss: _DualLoss

Dual loss

p_hat: Distribution

Empirical distribution

opt: OptCond

Optimality conditions

Returns:
theta: np.ndarray

Concatenated array of the parameters of the model, except the intercept if there is one

intercept: Optional[np.ndarray]

If the model has specificaly an intercept as one of its parameters, it is stacked in this output tensor

lambd: Union[np.ndarray, float]

Dual variable \lambda of the problem

skwdro.solvers.hybrid_opt module

class skwdro.solvers.hybrid_opt.HybridAdam(*args, **kwargs)[source]

Bases: HybridOpt, Adam

class skwdro.solvers.hybrid_opt.HybridOpt(params, **kwargs)[source]

Bases: object

step(*args, **kwargs)[source]
class skwdro.solvers.hybrid_opt.HybridSGD(*args, **kwargs)[source]

Bases: HybridOpt, SGD

skwdro.solvers.hybrid_opt.postrule(name)[source]
skwdro.solvers.hybrid_opt.postrule_mwu(p)[source]
skwdro.solvers.hybrid_opt.postrule_mwu_simplex(p)[source]
skwdro.solvers.hybrid_opt.postrule_non_neg(p)[source]
skwdro.solvers.hybrid_opt.prerule(name)[source]
skwdro.solvers.hybrid_opt.prerule_bound(p)[source]
skwdro.solvers.hybrid_opt.prerule_mwu(p)[source]
skwdro.solvers.hybrid_opt.prerule_mwu_simplex(p)[source]
skwdro.solvers.hybrid_opt.rule_max(p)[source]

skwdro.solvers.optim_cond module

class skwdro.solvers.optim_cond.OptCondTorch(order: int | str, tol_theta: float = 1e-08, tol_lambda: float = 1e-08, *, monitoring: str = 'theta', mode: str = 'rel', metric: str = 'grad', verbose: bool = False)[source]

Bases: object

Callable object representing some optimality conditions

May track two different expression of the error: * the relative error: \|u_n\| < tol \|u_0\| * the absolute error: \|u_n\| < tol

Those equations are evaluated for three possible metrics u_n:

  • the progress in the gradient of the dual loss with respect to the

parameter of interest \nabla_{\theta ,\lambda} J_{\theta_n}(\zeta_n) * the progress of the parameters themselves (\theta_n-\theta_{n-1} , \lambda_n-\lambda_{n-1})

To evaluate the above metrics, one may chose to monitor the convergence in:

  • only \theta

  • only \lambda

  • both

  • or either

Parameters:
order: int|str

norm type to use

tol_theta: float

if positive, the tolerance (relative or absolute) to allow for the parameters, if <=0 ignores it

tol_lambda: float

if positive, the tolerance (relative or absolute) to allow for the dual parameter, if <=0 ignores it

monitoring: str

see the global variables L_OR_T (for either convergence to allow stop), L_AND_T (for joint convergence to allow stop), JUST_L (for only \lambda), JUST_T (for only \theta) to have the allowed options

mode: str

either "rel" for relative progress or "abs" for absolute progress. Not checked if the metric is the gradient value

metric:

either "grad" for gradient improvement/change over time, or "param" for parameter-space improvement/change over time

check_all_params(lam: Callable[[], Tensor], lamgrad: Callable[[], Tensor], flattheta: Callable[[], Tensor], flatgrad: Callable[[], Tensor]) Tuple[bool, float][source]

Checks the dual and primal parameters for convergence by using functional monads on the tensors, see check_t() and check_l().

Returns:
cond: bool

green light to stop algorithm

check_iter(it_number: int) bool[source]

Checks if the maximum number of iterations has been crossed

Returns:
cond: bool

green light to stop algorithm

check_l(lam: Callable[[], Tensor], lam_grad: Callable[[], Tensor]) Tuple[bool, float][source]

Check the convergence of the theta parameter, either in gradient or in parameter value. The parameters are ``LazyTensor``s which means that they must be called as functions to be evaluated

Returns:
cond: bool

green light to stop algorithm

check_metric(new_obs: Tensor, memory: Tensor, tol: float) Tuple[bool, float][source]

Helper function to get the tolerance check in both the relative and absolute error cases.

Parameters:
new_obs: pt.Tensor

current step metric

memory: pt.Tensor

same metric at last step – initialized at None, so a check must be performed before call to this function

tol: float

the positive tolerance rate allowed (same for absolute and relative tolerance)

Returns:
cond: bool

green light to stop algorithm

check_t(flat_theta: Callable[[], Tensor], flat_theta_grad: Callable[[], Tensor]) Tuple[bool, float][source]

Check the convergence of the theta parameter, either in gradient or in parameter value. The parameters are ``LazyTensor``s which means that they must be called as functions to be evaluated.

Returns:
cond: bool

green light to stop algorithm

classmethod get_flat_grad(module: Module) Tensor[source]

Helper function to get a flat vector containing all the gradients of the primal model.

classmethod get_flat_param(module: Module) Tensor[source]

Helper function to get a flat vector containing all the primal parameters.

skwdro.solvers.optim_cond.combine_intersect(a: Tuple[bool, float], b: Tuple[bool, float]) Tuple[bool, float][source]
skwdro.solvers.optim_cond.combine_union(a: Tuple[bool, float], b: Tuple[bool, float]) Tuple[bool, float][source]
skwdro.solvers.optim_cond.wrap(b: bool) Tuple[bool, float][source]

skwdro.solvers.oracle_torch module

class skwdro.solvers.oracle_torch.CompositeOptimizer(params, lbd, n_iter, optimizer)[source]

Bases: Optimizer

load_state_dict(state_dict)[source]

Load the optimizer state.

Args:
state_dict (dict): optimizer state. Should be an object returned

from a call to state_dict().

reset_lbd_state()[source]
state_dict()[source]

Return the state of the optimizer as a dict.

It contains two entries:

  • state: a Dict holding current optimization state. Its content

    differs between optimizer classes, but some common characteristics hold. For example, state is saved per parameter, and the parameter itself is NOT saved. state is a Dictionary mapping parameter ids to a Dict with state corresponding to each parameter.

  • param_groups: a List containing all parameter groups where each

    parameter group is a Dict. Each parameter group contains metadata specific to the optimizer, such as learning rate and weight decay, as well as a List of parameter IDs of the parameters in the group.

NOTE: The parameter IDs may look like indices but they are just IDs associating state with param_group. When loading from a state_dict, the optimizer will zip the param_group params (int IDs) and the optimizer param_groups (actual nn.Parameter s) in order to match state WITHOUT additional verification.

A returned state dict might look something like:

{
    'state': {
        0: {'momentum_buffer': tensor(...), ...},
        1: {'momentum_buffer': tensor(...), ...},
        2: {'momentum_buffer': tensor(...), ...},
        3: {'momentum_buffer': tensor(...), ...}
    },
    'param_groups': [
        {
            'lr': 0.01,
            'weight_decay': 0,
            ...
            'params': [0]
        },
        {
            'lr': 0.001,
            'weight_decay': 0.5,
            ...
            'params': [1, 2, 3]
        }
    ]
}
step(closure: None = None) None[source]
step(closure: Callable) float

Perform a single optimization step to update parameter.

Args:
closure (Callable): A closure that reevaluates the model and

returns the loss. Optional for most optimizers.

Note

Unless otherwise specified, this function should not modify the .grad field of the parameters.

zero_grad(*args, **kwargs)[source]

Reset the gradients of all optimized torch.Tensor s.

Args:
set_to_none (bool): instead of setting to zero, set the grads to None.

This will in general have lower memory footprint, and can modestly improve performance. However, it changes certain behaviors. For example: 1. When the user tries to access a gradient and perform manual ops on it, a None attribute or a Tensor full of 0s will behave differently. 2. If the user requests zero_grad(set_to_none=True) followed by a backward pass, .grads are guaranteed to be None for params that did not receive a gradient. 3. torch.optim optimizers have a different behavior if the gradient is 0 or None (in one case it does the step with a gradient of 0 and in the other it skips the step altogether).

skwdro.solvers.oracle_torch.DualLoss

alias of DualPostSampledLoss

class skwdro.solvers.oracle_torch.DualPostSampledLoss(loss: Loss, cost: TorchCost, n_samples: int, epsilon_0: Tensor, rho_0: Tensor, n_iter: int | Tuple[int, int] = 10000, gradient_hypertuning: bool = False, *, imp_samp: bool = True, adapt: str | None = 'prodigy')[source]

Bases: _DualLoss

Dual loss implementing a sampling of the \zeta vectors at each forward pass.

Parameters:
lossLoss

the loss of interest L_\theta

costCost

ground-distance function

n_samplesint

number of \zeta samples to draw at each forward pass

forward(xi: Tensor, xi_labels: Tensor | None = None, zeta: None = None, zeta_labels: None = None, reset_sampler: bool = False) Tensor[source]
forward(xi: Tensor, xi_labels: Tensor | None, zeta: Tensor, zeta_labels: Tensor | None = None, reset_sampler: bool = False) Tensor

Forward pass for the dual loss, with the sampling of the adversarial samples

Parameters:
xipt.Tensor

data batch

xi_labelsOptional[pt.Tensor]

labels batch

reset_samplerbool

defaults to False, if set resets the batch saved in the sampler

Returns:
dlpt.Tensor
property presample

True for DualPreSampledLoss, False for DualPostSampledLoss.

Returns:
bool
reset_sampler_mean(xi: Tensor, xi_labels: Tensor | None = None)[source]

Prepare the sampler for a new batch of xi data.

Parameters:
xipt.Tensor

new data batch

xi_labelsOptional[pt.Tensor]

new labels batch

class skwdro.solvers.oracle_torch.DualPreSampledLoss(loss: Loss, cost: TorchCost, n_samples: int, epsilon_0: Tensor, rho_0: Tensor, n_iter: int | Tuple[int, int] = 50, gradient_hypertuning: bool = False, *, imp_samp: bool = True, adapt: str | None = 'prodigy')[source]

Bases: _DualLoss

Dual loss implementing a forward pass without resampling the \zeta vectors.

Parameters:
lossLoss

the loss of interest L_\theta

costCost

ground-distance function

n_samplesint

number of \zeta samples to draw before the gradient descent begins (can be changed if needed between inferences).

property current_samples: Tuple[Tensor | None, Tensor | None]
forward(xi: Tensor, xi_labels: Tensor | None = None, zeta: None = None, zeta_labels: None = None, reset_sampler: bool = False) Tensor[source]
forward(xi: Tensor, xi_labels: Tensor | None, zeta: Tensor, zeta_labels: Tensor | None = None, reset_sampler: bool = False)

Forward pass for the dual loss, wrt the already sampled \zeta values

Parameters:
xipt.Tensor

data batch

xi_labelsOptional[pt.Tensor]

labels batch

zetaOptional[pt.Tensor]

data batch

zeta_labelsOptional[pt.Tensor]

labels batch

Returns:
dlpt.Tensor
property presample

True for DualPreSampledLoss, False for DualPostSampledLoss.

Returns:
bool
zeta: Tensor | None
zeta_labels: Tensor | None

skwdro.solvers.result module

class skwdro.solvers.result.SolverResult(coef=None, intercept=None, dual_var=None, robust_loss=None, _iter_attrs=['coef', 'intercept', 'dual_var'], **kwargs)[source]

Bases: Iterable

skwdro.solvers.result.wrap_solver_result(solver_func)[source]

Decorator to wrap the return of a legacy solver

skwdro.solvers.specific_solvers module

skwdro.solvers.specific_solvers.SAANewsvendorSpecificSolver(k=5.0, u=7.0, samples=None)[source]
skwdro.solvers.specific_solvers.SAANewsvendorSpecificSolver2(k=5.0, u=7.0, samples=None)[source]
skwdro.solvers.specific_solvers.WDROLinRegSpecificSolver(rho: float = 1.0, X: ndarray = array(None, dtype=object), y: ndarray = array(None, dtype=object), fit_intercept: bool = False)[source]
skwdro.solvers.specific_solvers.WDROLogisticSpecificSolver(rho=1.0, kappa=1000, X=None, y=None, fit_intercept=False)[source]
skwdro.solvers.specific_solvers.WDRONewsvendorSpecificSolver(k=5.0, u=7.0, rho=1.0, samples=None)[source]
skwdro.solvers.specific_solvers.WDROPortfolioSpecificSolver(C, d, m, p, eta=0.0, alpha=0.95, rho=1.0, samples=None, fit_intercept=None)[source]

Solver for the dual program linked to Mean-Risk portfolio problem (Kuhn 2017).

skwdro.solvers.utils module

exception skwdro.solvers.utils.NoneGradError[source]

Bases: ValueError

skwdro.solvers.utils.check_tensor_validity(tensor: Tensor) bool[source]
skwdro.solvers.utils.detach_tensor(tensor: Tensor) ndarray[source]
skwdro.solvers.utils.diff_opt_tensor(tensor: Tensor | None, us_dim: int | None = 0) Tensor | None[source]
skwdro.solvers.utils.diff_tensor(tensor: Tensor, us_dim: int | None = 0) Tensor[source]
skwdro.solvers.utils.interpret_steps_struct(steps_spec: int | Tuple[int, int], default_split: float = 0.3) Tuple[int, int][source]
skwdro.solvers.utils.maybe_detach_tensor(tensor: Tensor | None) ndarray | None[source]
skwdro.solvers.utils.maybe_flatten_grad_else_raise(tensor: Tensor) Tensor[source]
skwdro.solvers.utils.maybe_unsqueeze(tensor: Tensor | None, dim: int = 0) Tensor | None[source]
skwdro.solvers.utils.normalize_just_vects(tensor: Tensor, threshold: float = 1.0, scaling: float = 1.0, dim: int = 0) Tensor[source]
skwdro.solvers.utils.normalize_maybe_vects(tensor: Tensor | None, threshold: float = 1.0, scaling: float = 1.0, dim: int = 0) Tensor | None[source]

Module contents

skwdro.solvers.BaseDualLoss

alias of _DualLoss

skwdro.solvers.DualLoss

alias of DualPostSampledLoss

class skwdro.solvers.DualPostSampledLoss(loss: Loss, cost: TorchCost, n_samples: int, epsilon_0: Tensor, rho_0: Tensor, n_iter: int | Tuple[int, int] = 10000, gradient_hypertuning: bool = False, *, imp_samp: bool = True, adapt: str | None = 'prodigy')[source]

Bases: _DualLoss

Dual loss implementing a sampling of the \zeta vectors at each forward pass.

Parameters:
lossLoss

the loss of interest L_\theta

costCost

ground-distance function

n_samplesint

number of \zeta samples to draw at each forward pass

forward(xi: Tensor, xi_labels: Tensor | None = None, zeta: None = None, zeta_labels: None = None, reset_sampler: bool = False) Tensor[source]
forward(xi: Tensor, xi_labels: Tensor | None, zeta: Tensor, zeta_labels: Tensor | None = None, reset_sampler: bool = False) Tensor

Forward pass for the dual loss, with the sampling of the adversarial samples

Parameters:
xipt.Tensor

data batch

xi_labelsOptional[pt.Tensor]

labels batch

reset_samplerbool

defaults to False, if set resets the batch saved in the sampler

Returns:
dlpt.Tensor
property presample

True for DualPreSampledLoss, False for DualPostSampledLoss.

Returns:
bool
reset_sampler_mean(xi: Tensor, xi_labels: Tensor | None = None)[source]

Prepare the sampler for a new batch of xi data.

Parameters:
xipt.Tensor

new data batch

xi_labelsOptional[pt.Tensor]

new labels batch

class skwdro.solvers.DualPreSampledLoss(loss: Loss, cost: TorchCost, n_samples: int, epsilon_0: Tensor, rho_0: Tensor, n_iter: int | Tuple[int, int] = 50, gradient_hypertuning: bool = False, *, imp_samp: bool = True, adapt: str | None = 'prodigy')[source]

Bases: _DualLoss

Dual loss implementing a forward pass without resampling the \zeta vectors.

Parameters:
lossLoss

the loss of interest L_\theta

costCost

ground-distance function

n_samplesint

number of \zeta samples to draw before the gradient descent begins (can be changed if needed between inferences).

property current_samples: Tuple[Tensor | None, Tensor | None]
erm_mode: bool
forward(xi: Tensor, xi_labels: Tensor | None = None, zeta: None = None, zeta_labels: None = None, reset_sampler: bool = False) Tensor[source]
forward(xi: Tensor, xi_labels: Tensor | None, zeta: Tensor, zeta_labels: Tensor | None = None, reset_sampler: bool = False)

Forward pass for the dual loss, wrt the already sampled \zeta values

Parameters:
xipt.Tensor

data batch

xi_labelsOptional[pt.Tensor]

labels batch

zetaOptional[pt.Tensor]

data batch

zeta_labelsOptional[pt.Tensor]

labels batch

Returns:
dlpt.Tensor
property presample

True for DualPreSampledLoss, False for DualPostSampledLoss.

Returns:
bool
training: bool
zeta: Tensor | None
zeta_labels: Tensor | None
exception skwdro.solvers.NoneGradError[source]

Bases: ValueError

skwdro.solvers.detach_tensor(tensor: Tensor) ndarray[source]
skwdro.solvers.diff_opt_tensor(tensor: Tensor | None, us_dim: int | None = 0) Tensor | None[source]
skwdro.solvers.diff_tensor(tensor: Tensor, us_dim: int | None = 0) Tensor[source]
skwdro.solvers.maybe_flatten_grad_else_raise(tensor: Tensor) Tensor[source]
skwdro.solvers.maybe_unsqueeze(tensor: Tensor | None, dim: int = 0) Tensor | None[source]
skwdro.solvers.normalize_just_vects(tensor: Tensor, threshold: float = 1.0, scaling: float = 1.0, dim: int = 0) Tensor[source]
skwdro.solvers.normalize_maybe_vects(tensor: Tensor | None, threshold: float = 1.0, scaling: float = 1.0, dim: int = 0) Tensor | None[source]