skwdro.solvers package#

Submodules#

skwdro.solvers.entropic_dual_torch module#

skwdro.solvers.entropic_dual_torch.extract_data(dist: Distribution)[source]#

Get torch tensors out of empirical distribution.

Parameters:
dist: Distribution

Empirical distribution of data and optionally labels

Returns:
xi: pt.Tensor

data tensor

xi_labels: Optional[pt.Tensor]

label tensor if the distribution yields them, else None

Shapes Shapes of input/output tensors.

  • xi: (m, d)

  • xi_labels: None or (m, d’)

skwdro.solvers.entropic_dual_torch.optim_postsample(optimizer: Optimizer, xi: Tensor, xi_labels: Tensor | None, loss: _DualFormulation, opt_cond: OptCondTorch) List[float][source]#

Optimize the dual loss by resampling the \(\zeta\) values at each gradient descent step. Note that the descent is performed fullbatch on \(\xi\).

Parameters:
n_iterint

number of gradient descent iterations to perform

optimizerpt.optim.Optimizer

loss-dependant optimizer, can be customized if needed

xipt.Tensor

data tensor

xi_labelsOptional[pt.Tensor]

target tensor

loss_DualLoss

dual loss instance

Returns:
List[float]

Shapes of input/output tensors.

  • xi: (m, d)

  • xi_labels: (m, d’)

skwdro.solvers.entropic_dual_torch.optim_presample(optimizer: Optimizer, xi: Tensor, xi_labels: Tensor | None, loss: _DualFormulation, opt_cond: OptCondTorch) List[float][source]#

Optimize the dual loss by sampling the \(zeta\) values once at the begining of the optimization, then performing a deterministic gradient descent (e.g. BFGS style algorithm).

Parameters:
optimizerpt.optim.Optimizer

loss-dependant optimizer, can be customized if needed

xipt.Tensor

data tensor

xi_labelsOptional[pt.Tensor]

target tensor

loss_DualLoss

dual loss instance

Returns:
losses: list[float]

Shapes of input/output tensors.

  • xi: (m, d)

  • xi_labels: (m, d’)

skwdro.solvers.entropic_dual_torch.solve_dual_wdro(loss: _DualFormulation, p_hat: Distribution, opt: OptCondTorch)[source]#

Solve the dual problem with the loss-dependant grandient descent algorithm.

Parameters:
loss: _DualLoss

Dual loss

p_hat: Distribution

Empirical distribution

opt: OptCond

Optimality conditions

Returns:
theta: np.ndarray

Concatenated array of the parameters of the model, except the intercept if there is one

intercept: Optional[np.ndarray]

If the model has specificaly an intercept as one of its parameters, it is stacked in this output tensor

lambd: Union[np.ndarray, float]

Dual variable \(\lambda\) of the problem

Shapes of input/output tensors.

  • theta: (n_params,)

  • intercept: (n_intercepts,) or None

  • lambd: (1,)

skwdro.solvers.hybrid_opt module#

class skwdro.solvers.hybrid_opt.HybridAdam(*args, **kwargs)[source]#

Bases: HybridOpt, Adam

class skwdro.solvers.hybrid_opt.HybridOpt(params, **kwargs)[source]#

Bases: object

step(*args, **kwargs)[source]#
class skwdro.solvers.hybrid_opt.HybridSGD(*args, **kwargs)[source]#

Bases: HybridOpt, SGD

skwdro.solvers.hybrid_opt.postrule(name)[source]#
skwdro.solvers.hybrid_opt.postrule_mwu(p)[source]#
skwdro.solvers.hybrid_opt.postrule_mwu_simplex(p)[source]#
skwdro.solvers.hybrid_opt.postrule_non_neg(p)[source]#
skwdro.solvers.hybrid_opt.prerule(name)[source]#
skwdro.solvers.hybrid_opt.prerule_bound(p)[source]#
skwdro.solvers.hybrid_opt.prerule_mwu(p)[source]#
skwdro.solvers.hybrid_opt.prerule_mwu_simplex(p)[source]#
skwdro.solvers.hybrid_opt.rule_max(p)[source]#

skwdro.solvers.optim_cond module#

class skwdro.solvers.optim_cond.OptCondTorch(order: int | str, tol_theta: float = 1e-08, tol_lambda: float = 1e-08, max_iter: int | None = None, *, monitoring: str = 'theta', mode: str = 'rel', metric: str = 'grad', verbose: bool = False)[source]#

Bases: object

Callable object representing some optimality conditions

May track two different expression of the error:
  • the relative error: \(\|u_n\| < tol \|u_0\|\)

  • the absolute error: \(\|u_n\| < tol\).

Those equations are evaluated for three possible metrics \(u_n\):
  • the progress in the gradient of the dual loss with respect to the

    parameter of interest \(\nabla_{\theta ,\lambda} J_{\theta_n}(\zeta_n)\)

  • the progress of the parameters themselves

    \((\theta_n-\theta_{n-1} , \lambda_n-\lambda_{n-1})\)

To evaluate the above metrics, one may chose to monitor the convergence in:

  • only \(\theta\)

  • only \(\lambda\)

  • both

  • or either.

Warning

If setting mode='grad', verify that your backward pass does

populate the .grad attribute of the parameter tensors of your model. If not, verify NaN values or the connectivity of the compute graph at the forward pass. This helper class does not perform those verifications.

Parameters:
order: int|str

norm type to use

tol_theta: float

if positive, the tolerance (relative or absolute) to allow for the parameters, if <=0 ignores it

tol_lambda: float

if positive, the tolerance (relative or absolute) to allow for the dual parameter, if <=0 ignores it

monitoring: str

see the global variables L_OR_T (for either convergence to allow stop), L_AND_T (for joint convergence to allow stop), JUST_L (for only \(\lambda\)), JUST_T (for only \(\theta\)) to have the allowed options

mode: str

either "rel" for relative progress or "abs" for absolute progress. Not checked if the metric is the gradient value

metric:

either "grad" for gradient improvement/change over time, or "param" for parameter-space improvement/change over time

check_all_params(lam: Callable[[], Tensor], lamgrad: Callable[[], Tensor], flattheta: Callable[[], Tensor], flatgrad: Callable[[], Tensor]) Tuple[bool, float][source]#

Checks the dual and primal parameters for convergence by using functional monads on the tensors, see check_t() and check_l().

Parameters:
lam: LazyTensor

the dual multiplier

lam_grad: LazyTensor

its scalar gradient

flat_theta: LazyTensor

the flattened concatenation of all the optimizeable parameters of the primal model

flat_theta_grad: LazyTensor

the flattened concatenation of the gradients of those parameters

Returns:
cond: bool

green light to stop algorithm

check_iter(it_number: int) bool[source]#

Checks if the maximum number of iterations has been crossed

Returns:
cond: bool

green light to stop algorithm

check_l(lam: Callable[[], Tensor], lam_grad: Callable[[], Tensor]) Tuple[bool, float][source]#

Check the convergence of the theta parameter, either in gradient or in parameter value. The parameters are LazyTensors which means that they must be called as functions to be evaluated.

Parameters:
lam: LazyTensor

the dual multiplier

lam_grad: LazyTensor

its scalar gradient

Returns:
cond: bool

green light to stop algorithm

check_metric(new_obs: Tensor, memory: Tensor, tol: float) Tuple[bool, float][source]#

Helper function to get the tolerance check in both the relative and absolute error cases.

Parameters:
new_obs: pt.Tensor

current step metric

memory: pt.Tensor

same metric at last step – initialized at None, so a check must be performed before call to this function

tol: float

the positive tolerance rate allowed (same for absolute and relative tolerance)

Returns:
cond: bool

green light to stop algorithm

check_t(flat_theta: Callable[[], Tensor], flat_theta_grad: Callable[[], Tensor]) Tuple[bool, float][source]#

Check the convergence of the theta parameter, either in gradient or in parameter value. The parameters are LazyTensors which means that they must be called as functions to be evaluated.

Parameters:
flat_theta: LazyTensor

the flattened concatenation of all the optimizeable parameters of the primal model

flat_theta_grad: LazyTensor

the flattened concatenation of the gradients of those parameters

Returns:
cond: bool

green light to stop algorithm

classmethod get_flat_grad(module: Module) Tensor[source]#

Helper function to get a flat vector containing all the gradients of the primal model.

classmethod get_flat_param(module: Module) Tensor[source]#

Helper function to get a flat vector containing all the primal parameters.

skwdro.solvers.optim_cond.combine_intersect(a: Tuple[bool, float], b: Tuple[bool, float]) Tuple[bool, float][source]#
skwdro.solvers.optim_cond.combine_union(a: Tuple[bool, float], b: Tuple[bool, float]) Tuple[bool, float][source]#
skwdro.solvers.optim_cond.wrap(b: bool) Tuple[bool, float][source]#

skwdro.solvers.oracle_torch module#

class skwdro.solvers.oracle_torch.CompositeOptimizer(params, lbd, n_iter, optimizer)[source]#

Bases: Optimizer

load_state_dict(state_dict)[source]#

Load the optimizer state.

Args:
state_dict (dict): optimizer state. Should be an object returned

from a call to state_dict().

Warning

Make sure this method is called after initializing torch.optim.lr_scheduler.LRScheduler, as calling it beforehand will overwrite the loaded learning rates.

Note

The names of the parameters (if they exist under the “param_names” key of each param group in state_dict()) will not affect the loading process. To use the parameters’ names for custom cases (such as when the parameters in the loaded state dict differ from those initialized in the optimizer), a custom register_load_state_dict_pre_hook should be implemented to adapt the loaded dict accordingly. If param_names exist in loaded state dict param_groups they will be saved and override the current names, if present, in the optimizer state. If they do not exist in loaded state dict, the optimizer param_names will remain unchanged.

Example:
>>> # xdoctest: +SKIP
>>> model = torch.nn.Linear(10, 10)
>>> optim = torch.optim.SGD(model.parameters(), lr=3e-4)
>>> scheduler1 = torch.optim.lr_scheduler.LinearLR(
...     optim,
...     start_factor=0.1,
...     end_factor=1,
...     total_iters=20,
... )
>>> scheduler2 = torch.optim.lr_scheduler.CosineAnnealingLR(
...     optim,
...     T_max=80,
...     eta_min=3e-5,
... )
>>> lr = torch.optim.lr_scheduler.SequentialLR(
...     optim,
...     schedulers=[scheduler1, scheduler2],
...     milestones=[20],
... )
>>> lr.load_state_dict(torch.load("./save_seq.pt"))
>>> # now load the optimizer checkpoint after loading the LRScheduler
>>> optim.load_state_dict(torch.load("./save_optim.pt"))
reset_lbd_state()[source]#
state_dict()[source]#

Return the state of the optimizer as a dict.

It contains two entries:

  • state: a Dict holding current optimization state. Its content

    differs between optimizer classes, but some common characteristics hold. For example, state is saved per parameter, and the parameter itself is NOT saved. state is a Dictionary mapping parameter ids to a Dict with state corresponding to each parameter.

  • param_groups: a List containing all parameter groups where each

    parameter group is a Dict. Each parameter group contains metadata specific to the optimizer, such as learning rate and weight decay, as well as a List of parameter IDs of the parameters in the group. If a param group was initialized with named_parameters() the names content will also be saved in the state dict.

NOTE: The parameter IDs may look like indices but they are just IDs associating state with param_group. When loading from a state_dict, the optimizer will zip the param_group params (int IDs) and the optimizer param_groups (actual nn.Parameter s) in order to match state WITHOUT additional verification.

A returned state dict might look something like:

{
    'state': {
        0: {'momentum_buffer': tensor(...), ...},
        1: {'momentum_buffer': tensor(...), ...},
        2: {'momentum_buffer': tensor(...), ...},
        3: {'momentum_buffer': tensor(...), ...}
    },
    'param_groups': [
        {
            'lr': 0.01,
            'weight_decay': 0,
            ...
            'params': [0]
            'param_names' ['param0']  (optional)
        },
        {
            'lr': 0.001,
            'weight_decay': 0.5,
            ...
            'params': [1, 2, 3]
            'param_names': ['param1', 'layer.weight', 'layer.bias'] (optional)
        }
    ]
}
step(closure: None = None) None[source]#
step(closure: Callable) float

Perform a single optimization step to update parameter.

Args:
closure (Callable): A closure that reevaluates the model and

returns the loss. Optional for most optimizers.

zero_grad(*args, **kwargs)[source]#

Reset the gradients of all optimized torch.Tensor s.

Args:

set_to_none (bool, optional): Instead of setting to zero, set the grads to None. Default: True

This will in general have lower memory footprint, and can modestly improve performance. However, it changes certain behaviors. For example:

  1. When the user tries to access a gradient and perform manual ops on it, a None attribute or a Tensor full of 0s will behave differently.

  2. If the user requests zero_grad(set_to_none=True) followed by a backward pass, .grads are guaranteed to be None for params that did not receive a gradient.

  3. torch.optim optimizers have a different behavior if the gradient is 0 or None (in one case it does the step with a gradient of 0 and in the other it skips the step altogether).

skwdro.solvers.oracle_torch.DualLoss#

alias of DualPostSampledLoss

class skwdro.solvers.oracle_torch.DualPostSampledLoss(loss: Loss, cost: TorchCost, n_samples: int, epsilon_0: Tensor, rho_0: Tensor, n_iter: int | Tuple[int, int] = 10000, *, reduction: str | None = None, gradient_hypertuning: bool = False, learning_rate: float | None = None, imp_samp: bool = True, adapt: str | None = 'prodigy')[source]#

Bases: _DualFormulation

Dual loss implementing a sampling of the \(\zeta\) vectors at each forward pass.

Parameters:
lossLoss

the (primal) loss of interest \(L_\theta\)

costCost

ground-distance function

n_samplesint

number of \(\zeta\) samples to draw at each forward pass

epsilon_0: torch.Tensor

scalar tensor containing the \(\varepsilon\) regularization hyperparameter

rho_0: torch.Tensor

scalar tensor containing the \(\rho\) (regularized) Wasserstein radius hyperparameter

n_iter: Steps

either a tuple (number of ERM iterations, number of DRO iterations), of type (int, int), or an integer for the number of DRO iterations

reduction: str | None

specifies the reduction to apply to the outer expectation of the SkWDRO formula applied: 'none' | 'mean' | 'sum'. - 'none': no reduction will be applied, - 'mean': the sum of the output will be divided by the number of elements in the output, - 'sum': the output will be summed. Default: None which translates to 'mean'

gradient_hypertuning: bool

set to True to accumulate gradients in rho and epsilon .. tip:: should almost always be kept to False

learning_rate: Optional[float]

set the stepsize of the torch.optim.AdamW algorithm. Defaults to None which will be parsed as 5e-2

imp_samp: bool

set to True to enable importance sampling

Warning

Unlike the skwdro.torch.robustify() interface, there is no protection against mistakes here. So please do not attempt to set importance sampling for now if:

  • your target is categorical

  • your model is non-differentiable

  • your model includes parts that use the regular .backwards() torch interface for inner autodiff utilities instead of the functional API

  • your cost functional does not implement the right functions (see appropriate tutorials).

adapt: Optional[str]

set to either:

  • None to use torch.optim.AdamW.

    Tip

    Set the learning rate with the above parameter learning_rate.

  • "prodigy" or "mechanic" to get automatic learning rate tuning

forward(xi: Tensor, xi_labels: Tensor | None = None, zeta: None = None, zeta_labels: None = None, reset_sampler: bool = True) Tensor[source]#
forward(xi: Tensor, xi_labels: Tensor | None, zeta: Tensor, zeta_labels: Tensor | None = None, reset_sampler: bool = True) Tensor

Forward pass for the dual loss, with the sampling of the adversarial samples

Shapes

of input/output tensors.

  • xi : (m, d)

  • xi_labels : (m, d’)

  • dl : (1,)

Parameters:
xipt.Tensor

data batch

xi_labelsOptional[pt.Tensor]

labels batch

reset_samplerbool

defaults to True, if set resets the batch saved in the sampler

Warning

Must be set to True for any flavor of SGD, otherwise

the samples will never be redrawn

Returns:
dlpt.Tensor
property presample#

True for DualPreSampledLoss, False for DualPostSampledLoss.

Returns:
bool
reset_sampler_mean(xi: Tensor, xi_labels: Tensor | None = None)[source]#

Prepare the sampler for a new batch of \(xi\) data.

Parameters:
xipt.Tensor

new data batch

xi_labelsOptional[pt.Tensor]

new labels batch

class skwdro.solvers.oracle_torch.DualPreSampledLoss(loss: Loss, cost: TorchCost, n_samples: int, epsilon_0: Tensor, rho_0: Tensor, n_iter: int | Tuple[int, int] = 50, gradient_hypertuning: bool = False, *, reduction: str | None = None, imp_samp: bool = True, learning_rate: float | None = None, adapt: str | None = 'prodigy')[source]#

Bases: _DualFormulation

Dual loss implementing a forward pass without resampling the \(\zeta\) vectors.

Parameters:
lossLoss

the loss of interest \(L_\theta\)

costCost

ground-distance function

n_samplesint

number of \(\zeta\) samples to draw before the gradient descent begins (can be changed if needed between inferences).

epsilon_0: torch.Tensor

scalar tensor containing the \(\varepsilon\) regularization hyperparameter

rho_0: torch.Tensor

scalar tensor containing the \(\rho\) (regularized) Wasserstein radius hyperparameter

n_iter: Steps

either a tuple (number of ERM iterations, number of DRO iterations), of type (int, int), or an integer for the number of DRO iterations

reduction: str | None

specifies the reduction to apply to the outer expectation of the SkWDRO formula applied: 'none' | 'mean' | 'sum'. - 'none': no reduction will be applied, - 'mean': the sum of the output will be divided by the number of elements in the output, - 'sum': the output will be summed. Default: None which translates to 'mean'

gradient_hypertuning: bool

set to True to accumulate gradients in rho and epsilon .. tip:: should almost always be kept to False

learning_rate: Optional[float]

set the stepsize of the torch.optim.AdamW algorithm. Defaults to None which will be parsed as 5e-2

imp_samp: bool

set to True to enable importance sampling

Warning

Unlike the skwdro.torch.robustify() interface, there is no protection against mistakes here. So please do not attempt to set importance sampling for now if:

  • your target is categorical

  • your model is non-differentiable

  • your model includes parts that use the regular .backwards() torch interface for inner autodiff utilities instead of the functional API

  • your cost functional does not implement the right functions (see appropriate tutorials)

  • the reduction for the outer expectation is set to none.

adapt: Optional[str]

set to either:

  • None to use torch.optim.LBFGS

    Tip

    Set the learning rate with the above parameter learning_rate.

  • "prodigy" or "mechanic" to get automatic learning rate tuning

Attributes:
zeta: Optional[torch.Tensor]

the set batch of inputs \(\zeta\). Set to None at initialization but will be dynamically overriden at the first forward pass

zeta: Optional[torch.Tensor]

the set batch of targets \(\zeta_y\). Set to None at initialization but will be dynamically overriden at the first forward pass if the problem is either of classification of regression type

property current_samples: Tuple[Tensor | None, Tensor | None]#
forward(xi: Tensor, xi_labels: Tensor | None = None, zeta: None = None, zeta_labels: None = None, reset_sampler: bool = True) Tensor[source]#
forward(xi: Tensor, xi_labels: Tensor | None, zeta: Tensor, zeta_labels: Tensor | None = None, reset_sampler: bool = True)

Forward pass for the dual loss, wrt the already sampled \(\zeta\) values

Parameters:
xipt.Tensor

data batch

xi_labelsOptional[pt.Tensor]

labels batch

zetaOptional[pt.Tensor]

data batch

zeta_labelsOptional[pt.Tensor]

labels batch

reset_sampler: bool

This parameter plays no role for this class, and can be left to True as anyway the sampler is never reset.

Returns:
dlpt.Tensor

Shapes of input/output tensors.

  • xi : (m, d)

  • xi_labels : (m, d’)

  • dl : (1,)

property presample#

True for DualPreSampledLoss, False for DualPostSampledLoss.

Returns:
bool
zeta: Tensor | None#
zeta_labels: Tensor | None#

skwdro.solvers.result module#

class skwdro.solvers.result.SolverResult(coef=None, intercept=None, dual_var=None, robust_loss=None, _iter_attrs=['coef', 'intercept', 'dual_var'], **kwargs)[source]#

Bases: Iterable

skwdro.solvers.result.wrap_solver_result(solver_func)[source]#

Decorator to wrap the return of a legacy solver

skwdro.solvers.specific_solvers module#

skwdro.solvers.specific_solvers.SAANewsvendorSpecificSolver(k=5.0, u=7.0, samples=None)[source]#
skwdro.solvers.specific_solvers.SAANewsvendorSpecificSolver2(k=5.0, u=7.0, samples=None)[source]#
skwdro.solvers.specific_solvers.WDROLinRegSpecificSolver(rho: float = 1.0, X: ndarray = array(None, dtype=object), y: ndarray = array(None, dtype=object), fit_intercept: bool = False)[source]#
skwdro.solvers.specific_solvers.WDROLogisticSpecificSolver(rho=1.0, kappa=1000, X=None, y=None, fit_intercept=False)[source]#
skwdro.solvers.specific_solvers.WDRONewsvendorSpecificSolver(k=5.0, u=7.0, rho=1.0, samples=None)[source]#
skwdro.solvers.specific_solvers.WDROPortfolioSpecificSolver(C, d, m, p, eta=0.0, alpha=0.95, rho=1.0, samples=None, fit_intercept=None)[source]#

Solver for the dual program linked to Mean-Risk portfolio problem (Kuhn 2017).

skwdro.solvers.utils module#

exception skwdro.solvers.utils.NoneGradError[source]#

Bases: ValueError

skwdro.solvers.utils.check_tensor_validity(tensor: Tensor) bool[source]#
skwdro.solvers.utils.detach_tensor(tensor: Tensor) ndarray[source]#
skwdro.solvers.utils.diff_opt_tensor(tensor: Tensor | None, us_dim: int | None = 0) Tensor | None[source]#
skwdro.solvers.utils.diff_tensor(tensor: Tensor, us_dim: int | None = 0) Tensor[source]#
skwdro.solvers.utils.interpret_steps_struct(steps_spec: int | Tuple[int, int], default_split: float = 0.3) Tuple[int, int][source]#
skwdro.solvers.utils.maybe_detach_tensor(tensor: Tensor | None) ndarray | None[source]#
skwdro.solvers.utils.maybe_flatten_grad_else_raise(tensor: Tensor) Tensor[source]#
skwdro.solvers.utils.maybe_unsqueeze(tensor: Tensor | None, dim: int = 0) Tensor | None[source]#
skwdro.solvers.utils.normalize_just_vects(tensor: Tensor, threshold: float = 1.0, scaling: float = 1.0, dim: int = 0) Tensor[source]#
skwdro.solvers.utils.normalize_maybe_vects(tensor: Tensor | None, threshold: float = 1.0, scaling: float = 1.0, dim: int = 0) Tensor | None[source]#

Module contents#

skwdro.solvers.BaseDualLoss#

alias of _DualFormulation

skwdro.solvers.DualLoss#

alias of DualPostSampledLoss

class skwdro.solvers.DualPostSampledLoss(loss: Loss, cost: TorchCost, n_samples: int, epsilon_0: Tensor, rho_0: Tensor, n_iter: int | Tuple[int, int] = 10000, *, reduction: str | None = None, gradient_hypertuning: bool = False, learning_rate: float | None = None, imp_samp: bool = True, adapt: str | None = 'prodigy')[source]#

Bases: _DualFormulation

Dual loss implementing a sampling of the \(\zeta\) vectors at each forward pass.

Parameters:
lossLoss

the (primal) loss of interest \(L_\theta\)

costCost

ground-distance function

n_samplesint

number of \(\zeta\) samples to draw at each forward pass

epsilon_0: torch.Tensor

scalar tensor containing the \(\varepsilon\) regularization hyperparameter

rho_0: torch.Tensor

scalar tensor containing the \(\rho\) (regularized) Wasserstein radius hyperparameter

n_iter: Steps

either a tuple (number of ERM iterations, number of DRO iterations), of type (int, int), or an integer for the number of DRO iterations

reduction: str | None

specifies the reduction to apply to the outer expectation of the SkWDRO formula applied: 'none' | 'mean' | 'sum'. - 'none': no reduction will be applied, - 'mean': the sum of the output will be divided by the number of elements in the output, - 'sum': the output will be summed. Default: None which translates to 'mean'

gradient_hypertuning: bool

set to True to accumulate gradients in rho and epsilon .. tip:: should almost always be kept to False

learning_rate: Optional[float]

set the stepsize of the torch.optim.AdamW algorithm. Defaults to None which will be parsed as 5e-2

imp_samp: bool

set to True to enable importance sampling

Warning

Unlike the skwdro.torch.robustify() interface, there is no protection against mistakes here. So please do not attempt to set importance sampling for now if:

  • your target is categorical

  • your model is non-differentiable

  • your model includes parts that use the regular .backwards() torch interface for inner autodiff utilities instead of the functional API

  • your cost functional does not implement the right functions (see appropriate tutorials).

adapt: Optional[str]

set to either:

  • None to use torch.optim.AdamW.

    Tip

    Set the learning rate with the above parameter learning_rate.

  • "prodigy" or "mechanic" to get automatic learning rate tuning

forward(xi: Tensor, xi_labels: Tensor | None = None, zeta: None = None, zeta_labels: None = None, reset_sampler: bool = True) Tensor[source]#
forward(xi: Tensor, xi_labels: Tensor | None, zeta: Tensor, zeta_labels: Tensor | None = None, reset_sampler: bool = True) Tensor

Forward pass for the dual loss, with the sampling of the adversarial samples

Shapes

of input/output tensors.

  • xi : (m, d)

  • xi_labels : (m, d’)

  • dl : (1,)

Parameters:
xipt.Tensor

data batch

xi_labelsOptional[pt.Tensor]

labels batch

reset_samplerbool

defaults to True, if set resets the batch saved in the sampler

Warning

Must be set to True for any flavor of SGD, otherwise

the samples will never be redrawn

Returns:
dlpt.Tensor
property presample#

True for DualPreSampledLoss, False for DualPostSampledLoss.

Returns:
bool
reset_sampler_mean(xi: Tensor, xi_labels: Tensor | None = None)[source]#

Prepare the sampler for a new batch of \(xi\) data.

Parameters:
xipt.Tensor

new data batch

xi_labelsOptional[pt.Tensor]

new labels batch

class skwdro.solvers.DualPreSampledLoss(loss: Loss, cost: TorchCost, n_samples: int, epsilon_0: Tensor, rho_0: Tensor, n_iter: int | Tuple[int, int] = 50, gradient_hypertuning: bool = False, *, reduction: str | None = None, imp_samp: bool = True, learning_rate: float | None = None, adapt: str | None = 'prodigy')[source]#

Bases: _DualFormulation

Dual loss implementing a forward pass without resampling the \(\zeta\) vectors.

Parameters:
lossLoss

the loss of interest \(L_\theta\)

costCost

ground-distance function

n_samplesint

number of \(\zeta\) samples to draw before the gradient descent begins (can be changed if needed between inferences).

epsilon_0: torch.Tensor

scalar tensor containing the \(\varepsilon\) regularization hyperparameter

rho_0: torch.Tensor

scalar tensor containing the \(\rho\) (regularized) Wasserstein radius hyperparameter

n_iter: Steps

either a tuple (number of ERM iterations, number of DRO iterations), of type (int, int), or an integer for the number of DRO iterations

reduction: str | None

specifies the reduction to apply to the outer expectation of the SkWDRO formula applied: 'none' | 'mean' | 'sum'. - 'none': no reduction will be applied, - 'mean': the sum of the output will be divided by the number of elements in the output, - 'sum': the output will be summed. Default: None which translates to 'mean'

gradient_hypertuning: bool

set to True to accumulate gradients in rho and epsilon .. tip:: should almost always be kept to False

learning_rate: Optional[float]

set the stepsize of the torch.optim.AdamW algorithm. Defaults to None which will be parsed as 5e-2

imp_samp: bool

set to True to enable importance sampling

Warning

Unlike the skwdro.torch.robustify() interface, there is no protection against mistakes here. So please do not attempt to set importance sampling for now if:

  • your target is categorical

  • your model is non-differentiable

  • your model includes parts that use the regular .backwards() torch interface for inner autodiff utilities instead of the functional API

  • your cost functional does not implement the right functions (see appropriate tutorials)

  • the reduction for the outer expectation is set to none.

adapt: Optional[str]

set to either:

  • None to use torch.optim.LBFGS

    Tip

    Set the learning rate with the above parameter learning_rate.

  • "prodigy" or "mechanic" to get automatic learning rate tuning

Attributes:
zeta: Optional[torch.Tensor]

the set batch of inputs \(\zeta\). Set to None at initialization but will be dynamically overriden at the first forward pass

zeta: Optional[torch.Tensor]

the set batch of targets \(\zeta_y\). Set to None at initialization but will be dynamically overriden at the first forward pass if the problem is either of classification of regression type

property current_samples: Tuple[Tensor | None, Tensor | None]#
forward(xi: Tensor, xi_labels: Tensor | None = None, zeta: None = None, zeta_labels: None = None, reset_sampler: bool = True) Tensor[source]#
forward(xi: Tensor, xi_labels: Tensor | None, zeta: Tensor, zeta_labels: Tensor | None = None, reset_sampler: bool = True)

Forward pass for the dual loss, wrt the already sampled \(\zeta\) values

Parameters:
xipt.Tensor

data batch

xi_labelsOptional[pt.Tensor]

labels batch

zetaOptional[pt.Tensor]

data batch

zeta_labelsOptional[pt.Tensor]

labels batch

reset_sampler: bool

This parameter plays no role for this class, and can be left to True as anyway the sampler is never reset.

Returns:
dlpt.Tensor

Shapes of input/output tensors.

  • xi : (m, d)

  • xi_labels : (m, d’)

  • dl : (1,)

property presample#

True for DualPreSampledLoss, False for DualPostSampledLoss.

Returns:
bool
zeta: Tensor | None#
zeta_labels: Tensor | None#
exception skwdro.solvers.NoneGradError[source]#

Bases: ValueError

skwdro.solvers.detach_tensor(tensor: Tensor) ndarray[source]#
skwdro.solvers.diff_opt_tensor(tensor: Tensor | None, us_dim: int | None = 0) Tensor | None[source]#
skwdro.solvers.diff_tensor(tensor: Tensor, us_dim: int | None = 0) Tensor[source]#
skwdro.solvers.maybe_flatten_grad_else_raise(tensor: Tensor) Tensor[source]#
skwdro.solvers.maybe_unsqueeze(tensor: Tensor | None, dim: int = 0) Tensor | None[source]#
skwdro.solvers.normalize_just_vects(tensor: Tensor, threshold: float = 1.0, scaling: float = 1.0, dim: int = 0) Tensor[source]#
skwdro.solvers.normalize_maybe_vects(tensor: Tensor | None, threshold: float = 1.0, scaling: float = 1.0, dim: int = 0) Tensor | None[source]#