Centralized Federated Learnming Algorithm#

class CentralFLAlgorithm(data_manager, metric_logger, num_clients, sample_scheme, sample_rate, model_def, epochs, criterion_def, optimizer_def=functools.partial(<class 'torch.optim.sgd.SGD'>, lr=1.0), local_optimizer_def=functools.partial(<class 'torch.optim.sgd.SGD'>, lr=0.1), lr_scheduler_def=None, local_lr_scheduler_def=None, r2r_local_lr_scheduler_def=None, batch_size=32, test_batch_size=64, device='cpu', *args, **kwargs)[source]#

Base class for centralized FL algorithm.

Parameters
  • data_manager (distributed.data_management.DataManager) -- data manager

  • metric_logger (logall.Logger) -- metric logger for tracking.

  • num_clients (int) -- number of clients

  • sample_scheme (str) -- mode of sampling clients. Options are 'uniform' and 'sequential'

  • sample_rate (float) -- rate of sampling clients

  • model_def (torch.Module) -- definition of for constructing the model

  • epochs (int) -- number of local epochs

  • criterion_def (Callable) -- loss function defining local objective

  • optimizer_def (Callable) -- derfintion of server optimizer

  • local_optimizer_def (Callable) -- defintoin of local optimizer

  • lr_scheduler_def (Callable) -- definition of lr scheduler of server optimizer.

  • local_lr_scheduler_def (Callable) -- definition of lr scheduler of local optimizer

  • r2r_local_lr_scheduler_def (Callable) -- definition to schedule lr that is delivered to the clients at each round (deterimined init lr of the client optimizer)

  • batch_size (int) -- batch size of the local trianing

  • test_batch_size (int) -- inference time batch size

  • device (str) -- cpu, cuda, or gpu number

Note

definition of * learning rate schedulers, could be any of the ones defined at torch.optim.lr_scheduler or any other that implements step and get_last_lr methods. * optimizers, could be any torch.optim.Optimizer. * model, could be any torch.Module. * criterion, could be any fedsim.losses.

Architecture:

../_images/arch.svg
at_round_end(score_aggregator: fedsim.utils.aggregators.AppendixAggregator) None[source]#

to inject code at the end of rounds in training loop

Parameters
  • server_storage (Storage) -- server storage object.

  • score_aggregator (AppendixAggregator) -- contains the aggregated scores

at_round_start() None[source]#

to inject code at the beginning of rounds in training loop.

Parameters

server_storage (Storage) -- server storage object.

deploy() Optional[Mapping[Hashable, Any]][source]#

return Mapping of name -> parameters_set to test the model

Parameters

server_storage (Storage) -- server storage object.

get_device() str[source]#

To get the device name or number

Returns

str -- device name or number

get_global_loader_split(split_name) Iterable[source]#

To get the data loader for a specific global split.

Parameters

split_name (Hashable) -- split name.

Returns

Iterable -- data loader for global split <split_name>

get_global_scores() Dict[str, Any][source]#

To instantiate and get global scores that have to be measured in the current round (log frequencies are matched).

Returns

Dict[str, Any] -- mapping of name:score

get_global_split_scores(split_name) Dict[str, Any][source]#

To instantiate and get global scores that have to be measured in the current round (log frequencies are matched) for a specific data split.

Parameters

split_name (Hashable) -- name of the global data split

Returns

Dict[str, Any] --

mapping of name:score. If no score is listed for the given

split, None is returned.

get_local_scores() Dict[str, Any][source]#

To instantiate and get local scores that have to be measured in the current round (log frequencies are matched).

Returns

Dict[str, Any] --

mapping of name:score. If no score is listed for the given

split, None is returned.

get_local_split_scores(split_name) Dict[str, Any][source]#

To instantiate and get local scores that have to be measured in the current round (log frequencies are matched) for a specific data split.

Parameters

split_name (Hashable) -- name of the global data split

Returns

Dict[str, Any] -- mapping of name:score

get_model_def()[source]#

To get the definition of the model so that one can instantiate it by calling.

Returns

Callable -- definition of the model. To instantiate, you may call the returned value with paranthesis in front.

get_round_number()[source]#

To get the current round number, starting from zero.

Returns

int -- current round number, starting from zero.

get_server_storage()[source]#

To access the public configs of the server.

Returns

Storage -- public server storage.

get_train_split_name()[source]#

To get the name of the split used to perform local training.

Returns

Hashable -- name of the split used for local training.

hook_global_score(score_def, score_name, split_name) None[source]#

To hook a score measurment on global data.

Parameters
  • score_def (Callable) -- definition of the score used to make new instances of. the list of existing scores could be found under fedsim.scores.

  • score_name (Hashable) -- name of the score to show up in the logs.

  • split_name (Hashable) -- name of the data split to apply the measurement on.

hook_local_score(score_def, score_name, split_name) None[source]#

To hook a score measurment on local data.

Parameters
  • score_def (Callable) -- definition of the score used to make new instances of. the list of existing scores could be found under fedsim.scores.

  • score_name (Hashable) -- name of the score to show up in the logs.

  • split_name (Hashable) -- name of the data split to apply the measurement on.

init(*args, **kwargs) None[source]#

this method is executed only once at the time of instantiating the algorithm object. Here you define your model and whatever needed during the training. Remember to write the outcome of your processing to server_storage for access in other methods.

Note

*args and **kwargs are directly passed through from algorithm constructor.

Parameters

server_storage (Storage) -- server storage object

optimize(serial_aggregator: fedsim.utils.aggregators.SerialAggregator, appendix_aggregator: fedsim.utils.aggregators.AppendixAggregator) Mapping[Hashable, Any][source]#

optimize server mdoel(s) and return scores to be reported

Parameters
  • server_storage (Storage) -- server storage object.

  • serial_aggregator (SerialAggregator) -- serial aggregator instance of current round.

  • appendix_aggregator (AppendixAggregator) -- appendix aggregator instance of current round.

Raises

NotImplementedError -- abstract class to be implemented by child

Returns

Mapping[Hashable, Any] -- context to be reported

receive_from_client(client_id: int, client_msg: Mapping[Hashable, Any], train_split_name: str, serial_aggregator: fedsim.utils.aggregators.SerialAggregator, appendix_aggregator: fedsim.utils.aggregators.AppendixAggregator) bool[source]#

receive and aggregate info from selected clients

Parameters
  • server_storage (Storage) -- server storage object.

  • client_id (int) -- id of the sender (client)

  • client_msg (Mapping[Hashable, Any]) -- client context that is sent.

  • train_split_name (str) -- name of the training split on clients.

  • aggregator (SerialAggregator) -- aggregator instance to collect info.

Returns

bool -- success of the aggregation.

Raises

NotImplementedError -- abstract class to be implemented by child

report(dataloaders: Dict[str, Any], round_scores: Dict[str, Dict[str, Any]], metric_logger: Optional[Any], device: str, optimize_reports: Mapping[Hashable, Any], deployment_points: Optional[Mapping[Hashable, torch.Tensor]] = None) Dict[str, Union[int, float]][source]#

test on global data and report info. If a flatten dict of str:Union[int,float] is returned from this function the content is automatically logged using the metric logger (e.g., logall.TensorboardLogger). metric_logger is also passed as an input argument for extra logging operations (non scalar).

Parameters
  • server_storage (Storage) -- server storage object.

  • dataloaders (Any) -- dict of data loaders to test the global model(s)

  • round_scores (Dict[str, Dict[str, fedsim.scores.Score]]) -- dictionary of form {'split_name':{'score_name': score_def}} for global scores to evaluate at the current round.

  • metric_logger (Any, optional) -- the logging object (e.g., logall.TensorboardLogger)

  • device (str) -- 'cuda', 'cpu' or gpu number

  • optimize_reports (Mapping[Hashable, Any]) -- dict returned by optimzier

  • deployment_points (Mapping[Hashable, torch.Tensor], optional) -- output of deploy method

Raises

NotImplementedError -- abstract class to be implemented by child

send_to_client(client_id: int) Mapping[Hashable, Any][source]#

returns context to send to the client corresponding to client_id.

Warning

Do not send shared objects like server model if you made any before you deepcopy it.

Parameters
  • server_storage (Storage) -- server storage object.

  • client_id (int) -- id of the receiving client

Raises

NotImplementedError -- abstract class to be implemented by child

Returns

Mapping[Hashable, Any] -- the context to be sent in form of a Mapping

send_to_server(rounds: int, storage: Dict[Hashable, Any], datasets: Dict[str, Iterable], train_split_name: str, scores: Dict[str, Dict[str, Any]], epochs: int, criterion: torch.nn.modules.module.Module, train_batch_size: int, inference_batch_size: int, optimizer_def: Callable, lr_scheduler_def: Optional[Callable] = None, device: Union[int, str] = 'cuda', ctx: Optional[Dict[Hashable, Any]] = None, *args, **kwargs) Mapping[str, Any][source]#

client operation on the recieved information.

Parameters
  • id (int) -- id of the client

  • rounds (int) -- global round number

  • storage (Storage) -- storage object of the client

  • datasets (Dict[str, Iterable]) -- this comes from Data Manager

  • train_split_name (str) -- string containing name of the training split

  • scores -- Dict[str, Dict[str, Score]]: dictionary of form {'split_name':{'score_name': Score}} for global scores to evaluate at the current round.

  • epochs (int) -- number of epochs to train

  • criterion (Score) -- citerion, should be a differentiable fedsim.scores.score

  • train_batch_size (int) -- training batch_size

  • inference_batch_size (int) -- inference batch_size

  • optimizer_def (float) -- class for constructing the local optimizer

  • lr_scheduler_def (float) -- class for constructing the local lr scheduler

  • device (Union[int, str], optional) -- Defaults to 'cuda'.

  • ctx (Optional[Dict[Hashable, Any]], optional) -- context reveived.

Returns

Mapping[str, Any] -- client context to be sent to the server

train(rounds: int, num_score_report_point: Optional[int] = None, train_split_name='train') Optional[Dict[str, Optional[float]]][source]#

loop over the learning pipeline of distributed algorithm for given number of rounds.

Note

  • The clients metrics are reported in the form of clients.{metric_name}.

  • The server metrics (scores results) are reported in the form of

    server.{deployment_point}.{metric_name}

Parameters
  • rounds (int) -- number of rounds to train.

  • num_score_report_point (int) -- limits num of points to return reports.

  • train_split_name (str) -- local split name to perform training on. Defaults to 'train'.

Returns

Optional[Dict[str, Union[float]]] -- collected score metrics.