Pipelining Internals
Internal API Warning
If you are utilizing the standard d9d training infrastructure, you do not need to call these functions manually. The framework automatically handles pipelining based on configuration.
About
This section details the internals of the d9d.pipelining module. It is intended for those who wish to implement new layouts, schedules, or modify the execution engine.
Architecture
The Idea
d9d decouples the Schedule Structure from the Runtime Execution.
- You write a builder (e.g.,
1F1B,DualPipe) that generates a linear list of logicalActions(e.g.,Forward(Stage=0, MB=1),Backward(Stage=0, MB=0)). If you want, d9d may automatically injectSend/Recvactions into your compute-only schedule based on data dependencies, preventing deadlocks. - You run a dumb virtual machine simply iterates the action list and executes them.
This makes implementing complex research schedules (like Zero Bubble or DualPipeV) significantly easier than managing state machines or recursive calls.
Core Components
PipelineStage (infra/stage/stage.py)
Encapsulates a user nn.Module. It is not responsible for deciding when to run. Instead, it provides atomic pipeline stage capabilities (such as forward and backward passes) to the actions and the executor.
Consists of:
- Computation Handlers:
ForwardComputeHandler: Performs forward pass, caches inputs/outputs for backward passes.BackwardComputeHandler: Performs backward pass, capable of splitting backward passes intobackward_input(dI) andbackward_weight(dW) for advanced schedules.
- Communication Handlers: Contain and manage the P2P buffers for both forward and backward passes.
Actions (infra/schedule/component/runtime/action.py)
The atomic instructions for the pipeline virtual machine.
ForwardComputeAction: Run forward on specific microbatch.BackwardFullInputComputeAction: Run backward. Can be configured to compute gradients for inputs-only or inputs+weights.BackwardWeightComputeAction: Compute gradients for weights (used in Zero Bubble schedules).ForwardSendAction/ForwardReceiveAction/BackwardSendAction/BackwardReceiveAction: Network IO.ComposeAction: Composes multiple actions into a single one. Used for Forward/Backward overlap in schedules such as DualPipeV.
Actions are designed to be declarative and immutable.
Programs
A Program is simply dict[int, list[ActionBase]] — a mapping of Rank ID to a sequential list of Actions.
Executor (infra/schedule/component/runtime/executor.py)
The PipelineScheduleExecutor is the runtime engine.
It:
- Shards global inputs into microbatches.
- Iterates through the
Programaction list. - Dispatches calls to
Actions that perform computation or communication workload.
Comparison with PyTorch
The d9d pipelining implementation is heavily inspired by and borrows concepts from the torch.distributed.pipelining API (e.g., ZeroBubble implementation), but refactors the codebase significantly to improve clarity, type safety, and modularity.
The main architectural differences lie in the strict separation of concerns and composition over inheritance:
-
Decomposed Stage Logic:
- PyTorch: Uses a monolithic
_PipelineStageBaseclass that simultaneously manages P2P buffer allocation, gradient accumulation state, and forward/backward execution logic. - d9d: Adopts a compositional approach. The
PipelineStageclass is a thin orchestrator that delegates responsibilities to dedicated handlers.
- PyTorch: Uses a monolithic
-
Polymorphic Actions vs Enumeration:
- PyTorch: Represents schedule instructions using a single generic
_ActionNamedTuple combined with an Enum (_ComputationType.FORWARD,_ComputationType.SEND_F, etc.). - d9d: Uses a class hierarchy for actions (
ForwardComputeAction,ForwardSendAction,ComposeAction). This allows the runtime executor to use structural pattern matching (match/case) rather than largeif/elifblocks checking enums, allows different actions to carry different metadata (e.g.full_backwardflag), and improves static type checking.
- PyTorch: Represents schedule instructions using a single generic
-
Builder Pattern vs Schedule Classes:
- PyTorch: Often couples the schedule definition with the runtime object (e.g.,
Schedule1F1Bclass contains both the logic to generate the ordering and the logic to execute it). - d9d: Strictly separates the Program Builder (which generates the list of actions) from the Executor (which runs the actions). This makes it easier to inspect a schedule plan before execution or swap scheduling algorithms without changing the runtime driver.
- PyTorch: Often couples the schedule definition with the runtime object (e.g.,
Building Custom Schedules
To build a new schedule, you create a PipelineProgramBuilder.
Implement the Builder
You must implement the pipeline program builder.
Registering
Add your configuration to factory/config.py and register the builder in factory/factory.py.
d9d.pipelining.infra.stage
PipelineStage
Represents a single structural stage in a Pipelined Model.
This class acts as an orchestrator that combines StageCommunicationHandler (for I/O)
and Forward/BackwardComputeHandler (for execution). It abstracts away the complexity
of buffer management, distributed communication, and gradient calculation from the scheduler.
__init__(info, module, group, stage_to_host_topology)
Constructs a PipelineStage object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
info
|
PipelineStageInfo
|
Metadata about the stage (index, total stages). |
required |
module
|
Module
|
The PyTorch module executed by this stage. |
required |
group
|
ProcessGroup
|
The distributed process group for pipeline communications. |
required |
stage_to_host_topology
|
dict[int, int]
|
Dict mapping stage ID to PP rank hosting it. |
required |
backward_one_chunk(microbatch_index, loss=None, full_backward=True)
Executes a backward pass for a single microbatch chunk.
Can perform either a full backward or just the input gradients (if full_backward=False).
It fetches required data from forward cache and communication buffers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
microbatch_index
|
int
|
The microbatch index. |
required |
loss
|
Tensor | None
|
The loss tensor (only used if this is the last stage). |
None
|
full_backward
|
bool
|
If True, computes grads for inputs and weights. If False, only for inputs. |
True
|
backward_weight_one_chunk(microbatch_index)
Executes the weight gradient accumulation part of the backward pass.
This assumes backward_one_chunk(..., full_backward=False) was already called
for this microbatch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
microbatch_index
|
int
|
The microbatch index. |
required |
configure_buffers(num_microbatches, has_backward, pipeline_inputs)
Initializes the communication handlers and buffers for the stage.
This must be called before execution to establish P2P buffer sizes and directions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_microbatches
|
int
|
Total number of microbatches to process. |
required |
has_backward
|
bool
|
Does this pipeline stage should store info for a backward pass |
required |
pipeline_inputs
|
dict[str, Tensor]
|
Pipeline input data. |
required |
forward_one_chunk(microbatch_index, pipeline_inputs, pipeline_kwargs=None)
Executes a forward pass for a single microbatch chunk.
Fetches inputs from the communication buffer (or pipeline_inputs if first stage),
runs the computation, and caches the result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
microbatch_index
|
int
|
The microbatch index. |
required |
pipeline_inputs
|
dict[str, Tensor]
|
Inputs provided locally (only used if this is the first stage). |
required |
pipeline_kwargs
|
dict[str, Any] | None
|
Additional arguments for the module. |
None
|
Returns:
| Type | Description |
|---|---|
|
The output tensors of the forward pass. |
get_bwd_recv_ops(microbatch_index)
Returns P2P ops to receive backward gradients for the given microbatch.
get_bwd_send_ops(microbatch_index)
Returns P2P ops to send backward gradients for the given microbatch.
get_fwd_recv_ops(microbatch_index)
Returns P2P ops to receive forward inputs for the given microbatch.
get_fwd_send_ops(microbatch_index)
Returns P2P ops to send forward outputs for the given microbatch.
pop_local_bwd_output(microbatch_index)
Retrieves local backward outputs (gradients).
reset()
Resets the internal state of communication handlers, clearing gradients on buffers.
set_local_bwd_input(inputs, microbatch_index)
Sets local backward inputs (output gradients) manually.
set_local_fwd_input(inputs, microbatch_index)
Sets local forward inputs manually.
Used for the V-shape schedulers.
d9d.pipelining.infra.schedule.component.runtime
Pipelining Runtime Package.
ActionBase
Bases: ABC
Abstract base class for all pipeline schedule actions.
An action represents an atomic unit of work in a pipeline schedule, such as computing a microbatch or sending/receiving a tensor.
has_backward_work
abstractmethod
property
Returns True if this action involves backward pass computations.
work_type
abstractmethod
property
Returns the classification of work this action performs.
__str__()
abstractmethod
Returns a short string representation of the action for logging/visualization.
apply(ctx)
abstractmethod
Executes the action logic using the provided context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
ActionContext
|
The runtime context containing stages, data, and communication handlers. |
required |
BackwardFullInputComputeAction
dataclass
Bases: ActionBase
Action to perform backward computation with respect to inputs.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage. |
microbatch_idx |
int
|
The integer index of the microbatch to compute. |
full_backward |
bool
|
If True, performs a full backward pass including inputs and weights. If False, may only compute gradients w.r.t inputs (depending on schedule implementation). |
BackwardReceiveAction
dataclass
Bases: ActionBase
Action to schedule a backward pass gradient receive operation.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage expecting the receive operation. |
microbatch_idx |
int
|
The integer index of the microbatch being received. |
BackwardSendAction
dataclass
Bases: ActionBase
Action to schedule a backward pass gradient send operation.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage initiating the send operation. |
microbatch_idx |
int
|
The integer index of the microbatch being sent. |
BackwardWeightComputeAction
dataclass
Bases: ActionBase
Action to perform gradient accumulation on weights.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage. |
microbatch_idx |
int
|
The integer index of the microbatch to compute. |
ComposeAction
dataclass
Bases: ActionBase
Composite action scheduling multiple sub-actions sequentially.
Used for forward/backward overlapping.
Attributes:
| Name | Type | Description |
|---|---|---|
actions |
tuple[ActionBase, ...]
|
A tuple of sub-actions to be executed sequentially. |
ForwardComputeAction
dataclass
Bases: ActionBase
Action to perform forward computation for a specific microbatch.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage. |
microbatch_idx |
int
|
The integer index of the microbatch to compute. |
ForwardReceiveAction
dataclass
Bases: ActionBase
Action to schedule a forward pass tensor receive operation.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage expecting the receive operation. |
microbatch_idx |
int
|
The integer index of the microbatch being received. |
ForwardSendAction
dataclass
Bases: ActionBase
Action to schedule a forward pass tensor send operation.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage initiating the send operation. |
microbatch_idx |
int
|
The integer index of the microbatch being sent. |
OfflinePipelineExecutor
Bases: PipelineSchedule
Executes the model immediately without pipeline parallelism.
This schedule treats the execution as a single stage with a single microbatch, running the forward and optionally backward pass directly. This is primarily used for single-device execution within the pipeline abstraction.
__init__(model, callback, do_backward)
Constructs the offline pipeline executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
Module
|
The PyTorch module to execute. |
required |
callback
|
PipelineLossFn | PipelineResultFn
|
Function to compute loss or process pipeline results. |
required |
do_backward
|
bool
|
Whether to execute the backward pass. |
required |
PipelineScheduleExecutor
Bases: PipelineSchedule
Executes a defined pipeline schedule by interpreting a sequence of actions.
__init__(dist_context, stages, num_microbatches, callback, program)
Constructs the schedule executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dist_context
|
DistributedContext
|
The distributed context. |
required |
stages
|
list[PipelineStage]
|
List of stages managed by this executor. |
required |
num_microbatches
|
int
|
Number of microbatches the global batch is split. |
required |
callback
|
PipelineLossFn | PipelineResultFn
|
Function to compute loss or process pipeline results. |
required |
program
|
dict[int, list[ActionBase]]
|
The execution plan mapping rank ID to a list of actions. |
required |
d9d.pipelining.infra.schedule.component.program
Pipeline Schedule Building Components.
This package provides the core building blocks and compiler passes used to generate execution schedules for distributed pipelines.
PipelineProgramBuilder
Bases: ABC
Abstract interface for building pipeline execution schedules.
num_stages_per_rank
abstractmethod
property
Returns the number of model stages designated for each rank.
topology_style
abstractmethod
property
Returns the topology style strategy used to assign stages to ranks.
compose(num_microbatches, pp_size)
abstractmethod
Generates the execution program for all ranks in the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_microbatches
|
int
|
Number of microbatches per step. |
required |
pp_size
|
int
|
Number of pipeline parallel ranks. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, list[ActionBase]]
|
A dictionary mapping rank indices to their list of sequential actions. |
ScheduleStyle
Bases: StrEnum
Defines the strategy for mapping logical stages to physical ranks.
Attributes:
| Name | Type | Description |
|---|---|---|
loop |
Assigns stages in a round-robin circular fashion (mod pp_size). |
|
v |
Assigns stages in a zig-zag V-shape pattern. Useful for interleaved 1F1B schedules. |
add_communication_ops(compute_actions, stage_to_rank, num_stages)
Injects communication actions into a computation-only schedule.
This function iterates through the provided compute schedule and simulates execution. When a compute action produces a result needed by a different rank, it injects Send/Receive pairs. It also reorders actions to ensure that Receive operations occur before the Computes that depend on them, preventing deadlocks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
compute_actions
|
dict[int, list[ActionBase]]
|
Initial schedule containing only compute operations. |
required |
stage_to_rank
|
dict[int, int]
|
Mapping from stage index to rank index. |
required |
num_stages
|
int
|
Total number of pipeline stages. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, list[ActionBase]]
|
A new schedule dictionary including both compute and communication actions. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the schedule simulation enters a deadlock state. |
build_stage_to_host_rank_topology(pp_size, num_stages, style)
Constructs the mapping from stage index to rank index.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pp_size
|
int
|
Number of pipeline parallel ranks. |
required |
num_stages
|
int
|
Total number of model stages. |
required |
style
|
ScheduleStyle
|
The topology style to use for assignment. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, int]
|
A dictionary mapping stage IDs to Rank IDs. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the style is unknown or if V-style parameters are invalid (num_stages must be divisible by pp_size). |
invert_stage_to_host_rank_topology(stage_to_host)
Inverts the topology mapping to list execution stages per rank.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stage_to_host
|
dict[int, int]
|
Mapping from stage index to rank index. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, list[int]]
|
A dictionary where keys are Rank IDs and values are lists of Stage IDs |
dict[int, list[int]]
|
managed by that rank. |
d9d.pipelining.infra.schedule.program
Pipeline Schedule Implementations
DualPipeVPipelineProgramBuilder
Bases: PipelineProgramBuilder
Builder for the DualPipeV Pipeline Parallelism schedule.
DualPipeV is a specialized bi-directional pipeline schedule designed for high throughput training. It requires exactly 2 stages per pipeline rank (V-shape) and utilizes split backward passes (Input gradients vs Weight gradients) to fill pipeline bubbles.
References
https://github.com/deepseek-ai/DualPipe https://hackmd.io/@ufotalent/r1lVXsa9Jg
__init__()
Constructs the DualPipeV builder.
Interleaved1F1BPipelineProgramBuilder
Bases: PipelineProgramBuilder
Builder for Interleaved Pipeline Parallelism schedules.
This builder supports:
- Standard Interleaved 1F1B: Assigns multiple stages per rank and prioritizes depth-first execution. (See https://arxiv.org/pdf/2104.04473)
- Interleaved Zero Bubble (ZB1P): Extends 1F1B by splitting backward passes into Input Gradients and Weight Gradients. Weight gradients are delayed to fill pipeline bubbles. (See https://arxiv.org/pdf/2401.10241)
__init__(num_stages_per_rank, enable_zero_bubble=False)
compose(num_microbatches, pp_size)
Generates the execution program for all ranks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_microbatches
|
int
|
Total microbatches. Must be divisible by the derived number of rounds. |
required |
pp_size
|
int
|
Number of pipeline ranks. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, list[ActionBase]]
|
A dictionary mapping rank indices to their list of sequential actions. |
LoopedBFSPipelineProgramBuilder
Bases: PipelineProgramBuilder
Builder for the Breadth-First Pipeline Parallelism schedule.
This schedule runs all available forward microbatches for local stages first. If configured for training, it then runs backwards in reverse topological order.
References
https://arxiv.org/pdf/2211.05953
__init__(num_stages_per_rank, inference_mode=False)
ZeroBubbleVPipelineProgramBuilder
Bases: PipelineProgramBuilder
Builder for the Zero Bubble V (ZBV) Pipeline Schedule.
This schedule is designed for V-shape topologies (2 stages per rank) and utilizes the Zero Bubble optimizations by splitting backward passes.
It requires exactly two stages per rank organized in a V-shape topology and splits backward passes into Input and Weight gradients to optimize pipeline throughput.
References
https://arxiv.org/pdf/2401.10241, Section 6
__init__()
Constructs the ZBV builder.
d9d.pipelining.training
PipelinedLRScheduler
Bases: LRSchedulerProtocol
Wrapper that manages multiple LR schedulers for a pipeline parallel rank.
Similar to PipelinedOptimizer, this aggregates schedulers corresponding to
multiple model stages hosted on the current rank.
PipelinedOptimizer
Bases: OptimizerProtocol
Wrapper that manages multiple optimizers for a pipeline parallel rank.
In a pipeline parallel setup, a single rank might host multiple stages, each having its own parameters and optimizer. This class aggregates them into a single interface.