Torch Module (API Reference)

bluefog.torch.broadcast_parameters(params, root_rank)

Broadcasts the parameters from root rank to all other processes. Typical usage is to broadcast the model.state_dict(), model.named_parameters(), or model.parameters().

Parameters:
  • params – One of the following: - list of parameters to broadcast - dict of parameters to broadcast
  • root_rank – The rank of the process from which parameters will be broadcasted to all other processes.
bluefog.torch.broadcast_optimizer_state(optimizer, root_rank)

Broadcasts an optimizer state from root rank to all other processes.

Parameters:
  • optimizer – An optimizer.
  • root_rank – The rank of the process from which the optimizer will be broadcasted to all other processes.
bluefog.torch.DistributedAllreduceOptimizer(optimizer, named_parameters=None)

An distributed optimizer that wraps another torch.optim.Optimizer through allreduce ops.

Parameters:
  • optimizer – Optimizer to use for computing gradients and applying updates.
  • named_parameters – A mapping between parameter names and values. Used for naming of allreduce operations. Typically just model.named_parameters()
bluefog.torch.DistributedBluefogOptimizer(optimizer, named_parameters=None)

An distributed optimizer that wraps another torch.optim.Optimizer through mpi_win_put ops.

Parameters:
  • optimizer – Optimizer to use for computing gradients and applying updates.
  • named_parameters – A mapping between parameter names and values. Used for naming of window operations. Typically just model.named_parameters()

Example

>>> import bluefog.torch as bf
>>> ...
>>> bf.init()
>>> optimizer = optim.SGD(model.parameters(), lr=lr * bf.size())
>>> optimizer = bf.DistributedBluefogOptimizer(
...    optimizer, named_parameters=model.named_parameters()
... )
bluefog.torch.DistributedNeighborAllreduceOptimizer(optimizer, named_parameters=None)

An distributed optimizer that wraps another torch.optim.Optimizer through neighbor_allreduce ops.

Parameters:
  • optimizer – Optimizer to use for computing gradients and applying updates.
  • named_parameters – A mapping between parameter names and values. Used for naming of allreduce operations. Typically just model.named_parameters()
bluefog.torch.allgather(tensor: <MagicMock id='140044193514888'>, name: str = None) → <MagicMock id='140044193523304'>

A function that concatenates the input tensor with the same input tensor on all other Bluefog processes. The input tensor is not modified.

The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape.

Parameters:
  • tensor – A tensor to allgather.
  • name – A name of the allgather operation.
Returns:

A tensor of the same type as tensor, concatenated on dimension zero across all processes. The shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the tensors in different Bluefog processes.

bluefog.torch.allgather_async(tensor: <MagicMock id='140044193552200'>, name: str = None) → int

A function that asynchronously concatenates the input tensor with the same input tensor on all other Bluefog processes. The input tensor is not modified.

The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape.

Parameters:
  • tensor – A tensor to allgather.
  • name – A name of the allgather operation.
Returns:

A handle to the allgather operation that can be used with poll() or synchronize().

bluefog.torch.allreduce(tensor: <MagicMock id='140044193939352'>, average: bool = True, name: str = None) → <MagicMock id='140044193964160'>

A function that performs averaging or summation of the input tensor over all the Bluefog processes. The input tensor is not modified.

The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Bluefog processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.

Parameters:
  • tensor – A tensor to average and sum.
  • average – A flag indicating whether to compute average or summation, defaults to average.
  • name – A name of the reduction operation.
Returns:

A tensor of the same shape and type as tensor, averaged or summed across all processes.

bluefog.torch.allreduce_async(tensor: <MagicMock id='140044193972520'>, average: bool = True, name: str = None) → int

A function that performs asynchronous averaging or summation of the input tensor over all the Bluefog processes. The input tensor is not modified.

The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Bluefog processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.

Parameters:
  • tensor – A tensor to average and sum.
  • average – A flag indicating whether to compute average or summation, defaults to average.
  • name – A name of the reduction operation.
Returns:

A handle to the allreduce operation that can be used with poll() or synchronize().

bluefog.torch.barrier()

Barrier function to sychronize all MPI processes.

After this function returns, it is guaranteed that all async functions before it is finished.

bluefog.torch.broadcast(tensor: <MagicMock id='140044193989128'>, root_rank: int, name: str = None) → <MagicMock id='140044193477352'>

A function that broadcasts the input tensor on root rank to the same input tensor on all other Bluefog processes. The input tensor is not modified.

The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Bluefog processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.

This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.

Parameters:
  • tensor – A tensor to broadcast.
  • root_rank – The rank to broadcast the value from.
  • name – A name of the broadcast operation.
Returns:

A tensor of the same shape and type as tensor, with the value broadcasted from root rank.

bluefog.torch.broadcast_(tensor, root_rank, name=None) → <MagicMock id='140044193502376'>

A function that broadcasts the input tensor on root rank to the same input tensor on all other Bluefog processes. The operation is performed in-place.

The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Bluefog processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.

Parameters:
  • tensor – A tensor to broadcast.
  • root_rank – The rank to broadcast the value from.
  • name – A name of the broadcast operation.
Returns:

A tensor of the same shape and type as tensor, with the value broadcasted from root rank.

bluefog.torch.broadcast_async(tensor: <MagicMock id='140044193485768'>, root_rank: int, name: str = None) → int

A function that asynchronously broadcasts the input tensor on root rank to the same input tensor on all other Bluefog processes. The input tensor is not modified.

The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Bluefog processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.

Parameters:
  • tensor – A tensor to broadcast.
  • root_rank – The rank to broadcast the value from.
  • name – A name of the broadcast operation.
Returns:

A handle to the broadcast operation that can be used with poll() or synchronize().

bluefog.torch.broadcast_async_(tensor, root_rank, name=None) → int

A function that asynchronously broadcasts the input tensor on root rank to the same input tensor on all other Bluefog processes. The operation is performed in-place.

The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Bluefog processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.

Parameters:
  • tensor – A tensor to broadcast.
  • root_rank – The rank to broadcast the value from.
  • name – A name of the broadcast operation.
Returns:

A handle to the broadcast operation that can be used with poll() or synchronize().

bluefog.torch.neighbor_allgather(tensor: <MagicMock id='140044193564712'>, name: str = None) → <MagicMock id='140044193573128'>

A function that concatenates the input tensor with the same input tensor on on all neighbor Bluefog processes (Not include self). The input tensor is not modified.

The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape.

Parameters:
  • tensor – A tensor to allgather.
  • name – A name of the allgather operation.
Returns:

A tensor of the same type as tensor, concatenated on dimension zero across all processes. The shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the tensors in neighbor Bluefog processes.

bluefog.torch.neighbor_allgather_async(tensor: <MagicMock id='140044193589736'>, name: str = None) → int

A function that asynchronously concatenates the input tensor with the same input tensor on all neighbor Bluefog processes (Not include self). The input tensor is not modified.

The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape.

Parameters:
  • tensor – A tensor to allgather.
  • name – A name of the allgather operation.
Returns:

A handle to the allgather operation that can be used with poll() or synchronize().

bluefog.torch.neighbor_allreduce(tensor: <MagicMock id='140044193602248'>, self_weight: float = None, neighbor_weights: Dict[int, float] = None, name: str = None) → <MagicMock id='140044193618856'>

A function that performs weighted averaging of the input tensor over the negihbors and itself in the Bluefog processes. The default behavior is (uniformly) average.

The input tensor is not modified.

The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Bluefog processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.

Parameters:
  • tensor – A tensor to weighted average.
  • self_weight – the weight for self node, used with neighbor_weights.
  • neighbor_weights – the weights for neighbor nodes, used with self weight. If neighbor_weights is presented, the return tensor will return the weighted average defined by these weights and the self_weight. If not, the return tensor will return the weighted average defined by the topology weights is provided or uniformly average. The data structure of weights should be {rank : weight} and rank has to belong to the (in-)neighbors.
  • name – A name of the reduction operation.
Returns:

A tensor of the same shape and type as tensor, across all processes.

Note: self_weight and neighbor_weights must be presented at the same time.

bluefog.torch.neighbor_allreduce_async(tensor: <MagicMock id='140044193631368'>, self_weight: float = None, neighbor_weights: Dict[int, float] = None, name: str = None) → int

A function that asynchronously performs weighted averaging of the input tensor over the negihbors and itself in the Bluefog processes. The default behavior is (uniformly) average.

The input tensor is not modified.

The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Bluefog processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.

Parameters:
  • tensor – A tensor to neighbor_allreduce.
  • self_weight – the weight for self node, used with neighbor_weights.
  • neighbor_weights – the weights for neighbor nodes, used with self weight. If neighbor_weights is presented, the return tensor will return the weighted average defined by these weights and the self_weight. If not, the return tensor will return the weighted average defined by the topology weights is provided or uniformly average. The data structure of weights should be {rank : weight} and rank has to belong to the (in-)neighbors.
  • name – A name of the neighbor_allreduce operation.
Returns:

A handle to the neighbor_allreduce operation that can be used with poll() or synchronize().

Note: self_weight and neighbor_weights must be presented at the same time.

bluefog.torch.poll(handle: int) → bool

Polls an allreduce, allgather or broadcast handle to determine whether underlying asynchronous operation has completed. After poll() returns True, synchronize() will return without blocking.

Parameters:handle – A handle returned by an allreduce, allgather or broadcast asynchronous operation.
Returns:A flag indicating whether the operation has completed.
bluefog.torch.synchronize(handle: int) → <MagicMock id='140044193643880'>

Synchronizes an asynchronous allreduce, allgather or broadcast operation until it’s completed. Returns the result of the operation.

Parameters:handle – A handle returned by an allreduce, allgather or broadcast asynchronous operation.
Returns:An output tensor of the operation.
Return type:torch.Tensor
bluefog.torch.torch
bluefog.torch.win_accumulate(tensor: <MagicMock id='140044193832912'>, name: str, dst_weights: Dict[int, float] = None, require_mutex: bool = False) → bool

Passively accmulate the tensor into neighbor’s shared window memory. Only SUM ops is supported now. This is a blocking function, which will return until win_accumulate operation is finished.

Parameters:
  • tesnor – The tensor that shares to neighbor.
  • name – The unique name to associate the window object.
  • dst_weights – A dictionary that maps the destination ranks to the weight. Namely, {rank: weight} means accumulate tensor * weight to the rank neighbor. If not provided, dst_weights will be set as all neighbor ranks defined by virtual topology with weight 1. Note dst_weights should only contain the ranks that belong to out-neighbors.
  • require_mutex – If set to be true, out-neighbor process’s window mutex will be acquired.
Returns:

A handle to the win_accmulate operation that can be used with win_poll() or win_wait().

bluefog.torch.win_accumulate_async(tensor: <MagicMock id='140044193863552'>, name: str, dst_weights: Dict[int, float] = None, require_mutex: bool = False) → bool

Passively accmulate the tensor into neighbor’s shared window memory. Only SUM ops is supported now. This is a non-blocking function, which will return without waiting the win_accumulate operation is really finished.

Parameters:
  • tesnor – The tensor that shares to neighbor.
  • name – The unique name to associate the window object.
  • dst_weights – A dictionary that maps the destination ranks to the weight. Namely, {rank: weight} means accumulate tensor * weight to the rank neighbor. If not provided, dst_weights will be set as all neighbor ranks defined by virtual topology with weight 1. Note dst_weights should only contain the ranks that belong to out-neighbors.
  • require_mutex – If set to be true, out-neighbor process’s window mutex will be acquired.
Returns:

A handle to the win_accmulate operation that can be used with win_poll() or win_wait().

bluefog.torch.win_create(tensor: <MagicMock id='140044193656392'>, name: str, zero_init: bool = False) → bool

Create MPI window for remote memoery access.

The window is dedicated to the provided tensor only, which is identified by unqiue name. It is a blocking operation, which required all bluefog process involved. The initial values of MPI windows for neighbors are the same as input tensor unless zero_init is set to be true.

Parameters:
  • tensor (torch.Tensor) – Provide the size, data type, and/or memory for window.
  • name (str) – The unique name to associate the window object.
  • zero_init (boll) – If set true, the buffer value initialize as zero instead of the value of tensor.
Returns:

Indicate the creation succeed or not.

Return type:

bool

Note: The window with same name across different bluefog processes should associate the tensor with same shape. Otherwise, the rest win_ops like win_sync, win_put will encounter unrecoverable memory segmentation fault.

bluefog.torch.win_free(name: str = None) → bool

Free the MPI windows associated with name.

Parameters:name (str) – The unique name to associate the window object. If name is none, free all the window objects.
Returns:Indicate the free succeed or not.
Return type:bool
bluefog.torch.win_get(name: str, src_weights: Dict[int, float] = None, require_mutex: bool = False) → bool

Passively get the tensor(s) from neighbors’ shared window memory into local shared memory, which cannot be accessed in python directly. The win_sync function is responsible for fetching that memeory. This is a blocking function, which will return until win_get operation is finished.

Parameters:
  • tensor – A tensor to get the result, should have same shape and type of the window object associated with name.
  • name – The unique name to associate the window object.
  • src_weights – A dictionary that maps the source ranks to the weight. Namely, {rank: weight} means get tensor * weight to the rank neighbor. If not provided, src_weights will be set as all neighbor ranks defined by virtual topology with weight 1.0 / (neighbor_size+1). Note src_weights should only contain the ranks that either belong to int-neighbors or self.
  • require_mutex – If set to be true, out-neighbor process’s window mutex will be acquired.
Returns:

A tensor of the same shape and type as tensor, averaged or summed across src_ranks processes (or all neighbor processes).

bluefog.torch.win_get_async(name: str, src_weights: Dict[int, float] = None, require_mutex: bool = False) → int

Passively get the tensor(s) from neighbors’ shared window memory into local shared memory, which cannot be accessed in python directly. The win_sync function is responsible for fetching that memeory. This is a non-blocking function, which will return without waiting the win_get operation is really finished.

Parameters:
  • name – The unique name to associate the window object.
  • src_weights – A dictionary that maps the source ranks to the weight. Namely, {rank: weight} means get tensor from rank neighbor multipling the weight. If not provided, src_weights will be set as all neighbor ranks defined by virtual topology with weight 1.0. Note src_weights should only contain the in-neighbors only.
  • require_mutex – If set to be true, out-neighbor process’s window mutex will be acquired.
Returns:

A handle to the win_get operation that can be used with poll() or synchronize().

bluefog.torch.win_lock(name: str)

win_lock context manager. Within the context, an RMA access epoch for its neihbor is created. Note The ops of win_get, win_accumulate, and win_put do not need win_lock context.

Parameters:name – The name of existing MPI_win object. If not found, ValueError will raise.
bluefog.torch.win_mutex(ranks: List[int] = None)

A win object implemented mutex context manager. Note, there are N distributed mutex over N corresponding processes.

Parameters:ranks – The mutex associated with the ranks will be acquired. If not presented, out_neighbor ranks will be used.

Example

>>> bf.win_create(tensor, name)
>>> with win_mutex():
        tensor = bf.win_sync_then_collect(name)
>>> win_put(tensor, name)
bluefog.torch.win_poll(handle: int) → bool

Return whether the win ops identified by handle is done or not.

bluefog.torch.win_put(tensor: <MagicMock id='140044193828424'>, name: str, dst_weights: Dict[int, float] = None, require_mutex: bool = False) → bool

Passively put the tensor into neighbor’s shared window memory. This is a blocking function, which will return until win_put operation is finished.

Parameters:
  • tensor – The tensor that shares to neighbor.
  • name – The unique name to associate the window object.
  • dst_weights – A dictionary that maps the destination ranks to the weight. Namely, {rank: weight} means put tensor * weight to the rank neighbor. If not provided, dst_weights will be set as all neighbor ranks defined by virtual topology with weight 1. Note dst_weights should only contain the ranks that belong to out-neighbors.
  • require_mutex – If set to be true, out-neighbor process’s window mutex will be acquired.
Returns:

A bool value to indicate the put succeeded or not.

bluefog.torch.win_put_async(tensor: <MagicMock id='140044193710376'>, name: str, dst_weights: Dict[int, float] = None, require_mutex: bool = False) → int

Passively put the tensor into neighbor’s shared window memory. This is a non-blocking function, which will return without waiting the win_put operation is really finished.

Parameters:
  • tesnor – The tensor that shares to neighbor.
  • name – The unique name to associate the window object.
  • dst_weights – A dictionary that maps the destination ranks to the weight. Namely, {rank: weight} means put tensor * weight to the rank neighbor. If not provided, dst_weights will be set as all neighbor ranks defined by virtual topology with weight 1. Note dst_weights should only contain the ranks that belong to out-neighbors.
  • require_mutex – If set to be true, out-neighbor process’s window mutex will be acquired.
Returns:

A handle to the win_put operation that can be used with win_poll() or win_wait().

bluefog.torch.win_sync(name: str, self_weight: float = None, neighbor_weights: Dict[int, float] = None, reset: bool = False, clone: bool = False) → <MagicMock id='140044193697864'>

Locally synchronized the window objects and returned the reduced neighbor tensor. Note the returned tensor is the same tensor used in win_create and in-place modification is happened.

Parameters:
  • name – The unique name to associate the window object.
  • self_weight – the weight for self node, used with neighbor_weights.
  • neighbor_weights – the weights for neighbor nodes, used with self_weight. If neighbor_weights is presented, the return tensor will return the weighted average defined by these weights and the self_weight. If not, the return tensor will return the weighted average defined by the topology weights if provided or mean value. The data structure of weights should be {rank : weight} and rank has to belong to the (in-)neighbors.
  • reset – If reset is True, the buffer used to store the neighbor tensor included in neighbor_weights will be reset to zero. The reset is always happened after the weights computation. If neighbor_weights is not presented and reset is True, all the neighbor will be reset.
  • clone – If set up to be true, the win_sync result will return a new tensor instead of in-place change.
Returns:

The average tensor of all neighbors’ cooresponding tensors.

Return type:

torch.Tensor

Note: Weights here will be useful if you need a dynamic weighted average, i.e. the weights change with the iterations. If static weight need, then setting the weights through the bf.set_topology(.., is_weighted=True) is a better choice.

Note2: If reset is True, mutex for self is acquired.

Note3: self_weight and neighbor_weights must be presented at the same time.

bluefog.torch.win_sync_then_collect(name: str) → <MagicMock id='140044193677096'>

A utility function to sync the neighbor buffers then accumulate all neighbor buffers’ tensors into self tensor and clear the buffer. It is equivalent to >>> win_sync(name, self_weight=1.0, neighbor_weights={neighbor: 1.0}, reset=True)

Parameters:name – The unique name to associate the window object.
Returns:The average tensor of all neighbors’ cooresponding tensors.
Return type:torch.Tensor
bluefog.torch.win_wait(handle: int) → bool

Wait until the async win ops identified by handle is done.