Centralized Federated Learnming Algorithm#
- class CentralFLAlgorithm(data_manager, metric_logger, num_clients, sample_scheme, sample_rate, model_def, epochs, criterion_def, optimizer_def=functools.partial(<class 'torch.optim.sgd.SGD'>, lr=1.0), local_optimizer_def=functools.partial(<class 'torch.optim.sgd.SGD'>, lr=0.1), lr_scheduler_def=None, local_lr_scheduler_def=None, r2r_local_lr_scheduler_def=None, batch_size=32, test_batch_size=64, device='cpu', *args, **kwargs)[source]#
Base class for centralized FL algorithm.
- Parameters
data_manager (
distributed.data_management.DataManager
) -- data managermetric_logger (
logall.Logger
) -- metric logger for tracking.num_clients (int) -- number of clients
sample_scheme (
str
) -- mode of sampling clients. Options are'uniform'
and'sequential'
sample_rate (
float
) -- rate of sampling clientsmodel_def (
torch.Module
) -- definition of for constructing the modelepochs (
int
) -- number of local epochscriterion_def (
Callable
) -- loss function defining local objectiveoptimizer_def (
Callable
) -- derfintion of server optimizerlocal_optimizer_def (
Callable
) -- defintoin of local optimizerlr_scheduler_def (
Callable
) -- definition of lr scheduler of server optimizer.local_lr_scheduler_def (
Callable
) -- definition of lr scheduler of local optimizerr2r_local_lr_scheduler_def (
Callable
) -- definition to schedule lr that is delivered to the clients at each round (deterimined init lr of the client optimizer)batch_size (int) -- batch size of the local trianing
test_batch_size (int) -- inference time batch size
device (str) -- cpu, cuda, or gpu number
Note
definition of * learning rate schedulers, could be any of the ones defined at
torch.optim.lr_scheduler
or any other that implements step and get_last_lr methods. * optimizers, could be anytorch.optim.Optimizer
. * model, could be anytorch.Module
. * criterion, could be anyfedsim.losses
.Architecture:
- at_round_end(score_aggregator: fedsim.utils.aggregators.AppendixAggregator) None [source]#
to inject code at the end of rounds in training loop
- Parameters
server_storage (Storage) -- server storage object.
score_aggregator (AppendixAggregator) -- contains the aggregated scores
- at_round_start() None [source]#
to inject code at the beginning of rounds in training loop.
- Parameters
server_storage (Storage) -- server storage object.
- deploy() Optional[Mapping[Hashable, Any]] [source]#
return Mapping of name -> parameters_set to test the model
- Parameters
server_storage (Storage) -- server storage object.
- get_global_loader_split(split_name) Iterable [source]#
To get the data loader for a specific global split.
- Parameters
split_name (Hashable) -- split name.
- Returns
Iterable -- data loader for global split <split_name>
- get_global_scores() Dict[str, Any] [source]#
To instantiate and get global scores that have to be measured in the current round (log frequencies are matched).
- Returns
Dict[str, Any] -- mapping of name:score
- get_global_split_scores(split_name) Dict[str, Any] [source]#
To instantiate and get global scores that have to be measured in the current round (log frequencies are matched) for a specific data split.
- Parameters
split_name (Hashable) -- name of the global data split
- Returns
Dict[str, Any] --
- mapping of name:score. If no score is listed for the given
split, None is returned.
- get_local_scores() Dict[str, Any] [source]#
To instantiate and get local scores that have to be measured in the current round (log frequencies are matched).
- Returns
Dict[str, Any] --
- mapping of name:score. If no score is listed for the given
split, None is returned.
- get_local_split_scores(split_name) Dict[str, Any] [source]#
To instantiate and get local scores that have to be measured in the current round (log frequencies are matched) for a specific data split.
- Parameters
split_name (Hashable) -- name of the global data split
- Returns
Dict[str, Any] -- mapping of name:score
- get_model_def()[source]#
To get the definition of the model so that one can instantiate it by calling.
- Returns
Callable -- definition of the model. To instantiate, you may call the returned value with paranthesis in front.
- get_round_number()[source]#
To get the current round number, starting from zero.
- Returns
int -- current round number, starting from zero.
- get_server_storage()[source]#
To access the public configs of the server.
- Returns
Storage -- public server storage.
- get_train_split_name()[source]#
To get the name of the split used to perform local training.
- Returns
Hashable -- name of the split used for local training.
- hook_global_score(score_def, score_name, split_name) None [source]#
To hook a score measurment on global data.
- Parameters
score_def (Callable) -- definition of the score used to make new instances of. the list of existing scores could be found under
fedsim.scores
.score_name (Hashable) -- name of the score to show up in the logs.
split_name (Hashable) -- name of the data split to apply the measurement on.
- hook_local_score(score_def, score_name, split_name) None [source]#
To hook a score measurment on local data.
- Parameters
score_def (Callable) -- definition of the score used to make new instances of. the list of existing scores could be found under
fedsim.scores
.score_name (Hashable) -- name of the score to show up in the logs.
split_name (Hashable) -- name of the data split to apply the measurement on.
- init(*args, **kwargs) None [source]#
this method is executed only once at the time of instantiating the algorithm object. Here you define your model and whatever needed during the training. Remember to write the outcome of your processing to server_storage for access in other methods.
Note
*args
and**kwargs
are directly passed through from algorithm constructor.- Parameters
server_storage (Storage) -- server storage object
- optimize(serial_aggregator: fedsim.utils.aggregators.SerialAggregator, appendix_aggregator: fedsim.utils.aggregators.AppendixAggregator) Mapping[Hashable, Any] [source]#
optimize server mdoel(s) and return scores to be reported
- Parameters
server_storage (Storage) -- server storage object.
serial_aggregator (SerialAggregator) -- serial aggregator instance of current round.
appendix_aggregator (AppendixAggregator) -- appendix aggregator instance of current round.
- Raises
NotImplementedError -- abstract class to be implemented by child
- Returns
Mapping[Hashable, Any] -- context to be reported
- receive_from_client(client_id: int, client_msg: Mapping[Hashable, Any], train_split_name: str, serial_aggregator: fedsim.utils.aggregators.SerialAggregator, appendix_aggregator: fedsim.utils.aggregators.AppendixAggregator) bool [source]#
receive and aggregate info from selected clients
- Parameters
server_storage (Storage) -- server storage object.
client_id (int) -- id of the sender (client)
client_msg (Mapping[Hashable, Any]) -- client context that is sent.
train_split_name (str) -- name of the training split on clients.
aggregator (SerialAggregator) -- aggregator instance to collect info.
- Returns
bool -- success of the aggregation.
- Raises
NotImplementedError -- abstract class to be implemented by child
- report(dataloaders: Dict[str, Any], round_scores: Dict[str, Dict[str, Any]], metric_logger: Optional[Any], device: str, optimize_reports: Mapping[Hashable, Any], deployment_points: Optional[Mapping[Hashable, torch.Tensor]] = None) Dict[str, Union[int, float]] [source]#
test on global data and report info. If a flatten dict of str:Union[int,float] is returned from this function the content is automatically logged using the metric logger (e.g., logall.TensorboardLogger). metric_logger is also passed as an input argument for extra logging operations (non scalar).
- Parameters
server_storage (Storage) -- server storage object.
dataloaders (Any) -- dict of data loaders to test the global model(s)
round_scores (Dict[str, Dict[str, fedsim.scores.Score]]) -- dictionary of form {'split_name':{'score_name': score_def}} for global scores to evaluate at the current round.
metric_logger (Any, optional) -- the logging object (e.g., logall.TensorboardLogger)
device (str) -- 'cuda', 'cpu' or gpu number
optimize_reports (Mapping[Hashable, Any]) -- dict returned by optimzier
deployment_points (Mapping[Hashable, torch.Tensor], optional) -- output of deploy method
- Raises
NotImplementedError -- abstract class to be implemented by child
- send_to_client(client_id: int) Mapping[Hashable, Any] [source]#
returns context to send to the client corresponding to client_id.
Warning
Do not send shared objects like server model if you made any before you deepcopy it.
- Parameters
server_storage (Storage) -- server storage object.
client_id (int) -- id of the receiving client
- Raises
NotImplementedError -- abstract class to be implemented by child
- Returns
Mapping[Hashable, Any] -- the context to be sent in form of a Mapping
- send_to_server(rounds: int, storage: Dict[Hashable, Any], datasets: Dict[str, Iterable], train_split_name: str, scores: Dict[str, Dict[str, Any]], epochs: int, criterion: torch.nn.modules.module.Module, train_batch_size: int, inference_batch_size: int, optimizer_def: Callable, lr_scheduler_def: Optional[Callable] = None, device: Union[int, str] = 'cuda', ctx: Optional[Dict[Hashable, Any]] = None, *args, **kwargs) Mapping[str, Any] [source]#
client operation on the recieved information.
- Parameters
id (int) -- id of the client
rounds (int) -- global round number
storage (Storage) -- storage object of the client
datasets (Dict[str, Iterable]) -- this comes from Data Manager
train_split_name (str) -- string containing name of the training split
scores -- Dict[str, Dict[str, Score]]: dictionary of form {'split_name':{'score_name': Score}} for global scores to evaluate at the current round.
epochs (int) -- number of epochs to train
criterion (Score) -- citerion, should be a differentiable fedsim.scores.score
train_batch_size (int) -- training batch_size
inference_batch_size (int) -- inference batch_size
optimizer_def (float) -- class for constructing the local optimizer
lr_scheduler_def (float) -- class for constructing the local lr scheduler
device (Union[int, str], optional) -- Defaults to 'cuda'.
ctx (Optional[Dict[Hashable, Any]], optional) -- context reveived.
- Returns
Mapping[str, Any] -- client context to be sent to the server
- train(rounds: int, num_score_report_point: Optional[int] = None, train_split_name='train') Optional[Dict[str, Optional[float]]] [source]#
loop over the learning pipeline of distributed algorithm for given number of rounds.
Note
The clients metrics are reported in the form of clients.{metric_name}.
- The server metrics (scores results) are reported in the form of
server.{deployment_point}.{metric_name}
- Parameters
rounds (int) -- number of rounds to train.
num_score_report_point (int) -- limits num of points to return reports.
train_split_name (str) -- local split name to perform training on. Defaults to 'train'.
- Returns
Optional[Dict[str, Union[float]]] -- collected score metrics.