Skip to content

Pipelining Internals

Internal API Warning

If you are utilizing the standard d9d training infrastructure, you do not need to call these functions manually. The framework automatically handles pipelining based on configuration.

About

This section details the internals of the d9d.pipelining module. It is intended for those who wish to implement new layouts, schedules, or modify the execution engine.

Architecture

The Idea

d9d decouples the Schedule Structure from the Runtime Execution.

  1. You write a builder (e.g., 1F1B, DualPipe) that generates a linear list of logical Actions (e.g., Forward(Stage=0, MB=1), Backward(Stage=0, MB=0)). If you want, d9d may automatically inject Send/Recv actions into your compute-only schedule based on data dependencies, preventing deadlocks.
  2. You run a dumb virtual machine simply iterates the action list and executes them.

This makes implementing complex research schedules (like Zero Bubble or DualPipeV) significantly easier than managing state machines or recursive calls.

Core Components

PipelineStage (infra/stage/stage.py)

Encapsulates a user nn.Module. It is not responsible for deciding when to run. Instead, it provides atomic pipeline stage capabilities (such as forward and backward passes) to the actions and the executor.

Consists of:

  • Computation Handlers:
    • ForwardComputeHandler: Performs forward pass, caches inputs/outputs for backward passes.
    • BackwardComputeHandler: Performs backward pass, capable of splitting backward passes into backward_input (dI) and backward_weight (dW) for advanced schedules.
  • Communication Handlers: Contain and manage the P2P buffers for both forward and backward passes.

Actions (infra/schedule/component/runtime/action.py)

The atomic instructions for the pipeline virtual machine.

  • ForwardComputeAction: Run forward on specific microbatch.
  • BackwardFullInputComputeAction: Run backward. Can be configured to compute gradients for inputs-only or inputs+weights.
  • BackwardWeightComputeAction: Compute gradients for weights (used in Zero Bubble schedules).
  • ForwardSendAction / ForwardReceiveAction / BackwardSendAction / BackwardReceiveAction: Network IO.
  • ComposeAction: Composes multiple actions into a single one. Used for Forward/Backward overlap in schedules such as DualPipeV.

Actions are designed to be declarative and immutable.

Programs

A Program is simply dict[int, list[ActionBase]] — a mapping of Rank ID to a sequential list of Actions.

Executor (infra/schedule/component/runtime/executor.py)

The PipelineScheduleExecutor is the runtime engine.

It:

  1. Shards global inputs into microbatches.
  2. Iterates through the Program action list.
  3. Dispatches calls to Actions that perform computation or communication workload.

Comparison with PyTorch

The d9d pipelining implementation is heavily inspired by and borrows concepts from the torch.distributed.pipelining API (e.g., ZeroBubble implementation), but refactors the codebase significantly to improve clarity, type safety, and modularity.

The main architectural differences lie in the strict separation of concerns and composition over inheritance:

  1. Decomposed Stage Logic:

    • PyTorch: Uses a monolithic _PipelineStageBase class that simultaneously manages P2P buffer allocation, gradient accumulation state, and forward/backward execution logic.
    • d9d: Adopts a compositional approach. The PipelineStage class is a thin orchestrator that delegates responsibilities to dedicated handlers.
  2. Polymorphic Actions vs Enumeration:

    • PyTorch: Represents schedule instructions using a single generic _Action NamedTuple combined with an Enum (_ComputationType.FORWARD, _ComputationType.SEND_F, etc.).
    • d9d: Uses a class hierarchy for actions (ForwardComputeAction, ForwardSendAction, ComposeAction). This allows the runtime executor to use structural pattern matching (match/case) rather than large if/elif blocks checking enums, allows different actions to carry different metadata (e.g. full_backward flag), and improves static type checking.
  3. Builder Pattern vs Schedule Classes:

    • PyTorch: Often couples the schedule definition with the runtime object (e.g., Schedule1F1B class contains both the logic to generate the ordering and the logic to execute it).
    • d9d: Strictly separates the Program Builder (which generates the list of actions) from the Executor (which runs the actions). This makes it easier to inspect a schedule plan before execution or swap scheduling algorithms without changing the runtime driver.

Building Custom Schedules

To build a new schedule, you create a PipelineProgramBuilder.

Implement the Builder

You must implement the pipeline program builder.

from collections import defaultdict

from d9d.pipelining.infra.schedule.component.program import (
    PipelineProgramBuilder, 
    build_stage_to_host_rank_topology,
    ScheduleStyle, 
    add_communication_ops
)
from d9d.pipelining.infra.schedule.component.runtime import (
    ActionBase, 
    ForwardComputeAction
)


class MyFancyScheduleBuilder(PipelineProgramBuilder):
    def __init__(self, stages_per_rank: int):
        self._stages_per_rank = stages_per_rank

    @property
    def num_stages_per_rank(self) -> int:
        return self._stages_per_rank

    @property
    def topology_style(self) -> ScheduleStyle:
        return ScheduleStyle.loop

    def compose(
            self, 
            num_microbatches: int, 
            pp_size: int
    ) -> dict[int, list[ActionBase]]:
        # Map logical stages to ranks
        stage_to_rank = build_stage_to_host_rank_topology(
            num_stages=self._stages_per_rank * pp_size,
            style=ScheduleStyle.loop,
            pp_size=pp_size
        )

        actions = defaultdict(list)

        # 1. Generate Compute Schedule
        for rank in range(pp_size):
            # ... custom logic to decide order of Fwd/Bwd ...
            actions[rank].append(
                ForwardComputeAction(stage_idx=..., microbatch_idx=...)
            )

        # 2. Inject Communications (Magic Pass)
        # This analyzes data dependencies between stages and inserts Send/Recvs
        return add_communication_ops(
            actions, 
            stage_to_rank, 
            num_stages=self._stages_per_rank * pp_size
        )

Registering

Add your configuration to factory/config.py and register the builder in factory/factory.py.

d9d.pipelining.infra.stage

PipelineStage

Represents a single structural stage in a Pipelined Model.

This class acts as an orchestrator that combines StageCommunicationHandler (for I/O) and Forward/BackwardComputeHandler (for execution). It abstracts away the complexity of buffer management, distributed communication, and gradient calculation from the scheduler.

__init__(info, module, group, stage_to_host_topology)

Constructs a PipelineStage object.

Parameters:

Name Type Description Default
info PipelineStageInfo

Metadata about the stage (index, total stages).

required
module Module

The PyTorch module executed by this stage.

required
group ProcessGroup

The distributed process group for pipeline communications.

required
stage_to_host_topology dict[int, int]

Dict mapping stage ID to PP rank hosting it.

required

backward_one_chunk(microbatch_index, loss=None, full_backward=True)

Executes a backward pass for a single microbatch chunk.

Can perform either a full backward or just the input gradients (if full_backward=False). It fetches required data from forward cache and communication buffers.

Parameters:

Name Type Description Default
microbatch_index int

The microbatch index.

required
loss Tensor | None

The loss tensor (only used if this is the last stage).

None
full_backward bool

If True, computes grads for inputs and weights. If False, only for inputs.

True

backward_weight_one_chunk(microbatch_index)

Executes the weight gradient accumulation part of the backward pass.

This assumes backward_one_chunk(..., full_backward=False) was already called for this microbatch.

Parameters:

Name Type Description Default
microbatch_index int

The microbatch index.

required

configure_buffers(num_microbatches, has_backward, pipeline_inputs)

Initializes the communication handlers and buffers for the stage.

This must be called before execution to establish P2P buffer sizes and directions.

Parameters:

Name Type Description Default
num_microbatches int

Total number of microbatches to process.

required
has_backward bool

Does this pipeline stage should store info for a backward pass

required
pipeline_inputs dict[str, Tensor]

Pipeline input data.

required

forward_one_chunk(microbatch_index, pipeline_inputs, pipeline_kwargs=None)

Executes a forward pass for a single microbatch chunk.

Fetches inputs from the communication buffer (or pipeline_inputs if first stage), runs the computation, and caches the result.

Parameters:

Name Type Description Default
microbatch_index int

The microbatch index.

required
pipeline_inputs dict[str, Tensor]

Inputs provided locally (only used if this is the first stage).

required
pipeline_kwargs dict[str, Any] | None

Additional arguments for the module.

None

Returns:

Type Description

The output tensors of the forward pass.

get_bwd_recv_ops(microbatch_index)

Returns P2P ops to receive backward gradients for the given microbatch.

get_bwd_send_ops(microbatch_index)

Returns P2P ops to send backward gradients for the given microbatch.

get_fwd_recv_ops(microbatch_index)

Returns P2P ops to receive forward inputs for the given microbatch.

get_fwd_send_ops(microbatch_index)

Returns P2P ops to send forward outputs for the given microbatch.

pop_local_bwd_output(microbatch_index)

Retrieves local backward outputs (gradients).

reset()

Resets the internal state of communication handlers, clearing gradients on buffers.

set_local_bwd_input(inputs, microbatch_index)

Sets local backward inputs (output gradients) manually.

set_local_fwd_input(inputs, microbatch_index)

Sets local forward inputs manually.

Used for the V-shape schedulers.

d9d.pipelining.infra.schedule.component.runtime

Pipelining Runtime Package.

ActionBase

Bases: ABC

Abstract base class for all pipeline schedule actions.

An action represents an atomic unit of work in a pipeline schedule, such as computing a microbatch or sending/receiving a tensor.

has_backward_work abstractmethod property

Returns True if this action involves backward pass computations.

work_type abstractmethod property

Returns the classification of work this action performs.

__str__() abstractmethod

Returns a short string representation of the action for logging/visualization.

apply(ctx) abstractmethod

Executes the action logic using the provided context.

Parameters:

Name Type Description Default
ctx ActionContext

The runtime context containing stages, data, and communication handlers.

required

BackwardFullInputComputeAction dataclass

Bases: ActionBase

Action to perform backward computation with respect to inputs.

Attributes:

Name Type Description
stage_idx int

The integer index of the pipeline stage.

microbatch_idx int

The integer index of the microbatch to compute.

full_backward bool

If True, performs a full backward pass including inputs and weights. If False, may only compute gradients w.r.t inputs (depending on schedule implementation).

BackwardReceiveAction dataclass

Bases: ActionBase

Action to schedule a backward pass gradient receive operation.

Attributes:

Name Type Description
stage_idx int

The integer index of the pipeline stage expecting the receive operation.

microbatch_idx int

The integer index of the microbatch being received.

BackwardSendAction dataclass

Bases: ActionBase

Action to schedule a backward pass gradient send operation.

Attributes:

Name Type Description
stage_idx int

The integer index of the pipeline stage initiating the send operation.

microbatch_idx int

The integer index of the microbatch being sent.

BackwardWeightComputeAction dataclass

Bases: ActionBase

Action to perform gradient accumulation on weights.

Attributes:

Name Type Description
stage_idx int

The integer index of the pipeline stage.

microbatch_idx int

The integer index of the microbatch to compute.

ComposeAction dataclass

Bases: ActionBase

Composite action scheduling multiple sub-actions sequentially.

Used for forward/backward overlapping.

Attributes:

Name Type Description
actions tuple[ActionBase, ...]

A tuple of sub-actions to be executed sequentially.

ForwardComputeAction dataclass

Bases: ActionBase

Action to perform forward computation for a specific microbatch.

Attributes:

Name Type Description
stage_idx int

The integer index of the pipeline stage.

microbatch_idx int

The integer index of the microbatch to compute.

ForwardReceiveAction dataclass

Bases: ActionBase

Action to schedule a forward pass tensor receive operation.

Attributes:

Name Type Description
stage_idx int

The integer index of the pipeline stage expecting the receive operation.

microbatch_idx int

The integer index of the microbatch being received.

ForwardSendAction dataclass

Bases: ActionBase

Action to schedule a forward pass tensor send operation.

Attributes:

Name Type Description
stage_idx int

The integer index of the pipeline stage initiating the send operation.

microbatch_idx int

The integer index of the microbatch being sent.

OfflinePipelineExecutor

Bases: PipelineSchedule

Executes the model immediately without pipeline parallelism.

This schedule treats the execution as a single stage with a single microbatch, running the forward and optionally backward pass directly. This is primarily used for single-device execution within the pipeline abstraction.

__init__(model, callback, do_backward)

Constructs the offline pipeline executor.

Parameters:

Name Type Description Default
model Module

The PyTorch module to execute.

required
callback PipelineLossFn | PipelineResultFn

Function to compute loss or process pipeline results.

required
do_backward bool

Whether to execute the backward pass.

required

PipelineScheduleExecutor

Bases: PipelineSchedule

Executes a defined pipeline schedule by interpreting a sequence of actions.

__init__(dist_context, stages, num_microbatches, callback, program)

Constructs the schedule executor.

Parameters:

Name Type Description Default
dist_context DistributedContext

The distributed context.

required
stages list[PipelineStage]

List of stages managed by this executor.

required
num_microbatches int

Number of microbatches the global batch is split.

required
callback PipelineLossFn | PipelineResultFn

Function to compute loss or process pipeline results.

required
program dict[int, list[ActionBase]]

The execution plan mapping rank ID to a list of actions.

required

d9d.pipelining.infra.schedule.component.program

Pipeline Schedule Building Components.

This package provides the core building blocks and compiler passes used to generate execution schedules for distributed pipelines.

PipelineProgramBuilder

Bases: ABC

Abstract interface for building pipeline execution schedules.

num_stages_per_rank abstractmethod property

Returns the number of model stages designated for each rank.

topology_style abstractmethod property

Returns the topology style strategy used to assign stages to ranks.

compose(num_microbatches, pp_size) abstractmethod

Generates the execution program for all ranks in the pipeline.

Parameters:

Name Type Description Default
num_microbatches int

Number of microbatches per step.

required
pp_size int

Number of pipeline parallel ranks.

required

Returns:

Type Description
dict[int, list[ActionBase]]

A dictionary mapping rank indices to their list of sequential actions.

ScheduleStyle

Bases: StrEnum

Defines the strategy for mapping logical stages to physical ranks.

Attributes:

Name Type Description
loop

Assigns stages in a round-robin circular fashion (mod pp_size).

v

Assigns stages in a zig-zag V-shape pattern. Useful for interleaved 1F1B schedules.

add_communication_ops(compute_actions, stage_to_rank, num_stages)

Injects communication actions into a computation-only schedule.

This function iterates through the provided compute schedule and simulates execution. When a compute action produces a result needed by a different rank, it injects Send/Receive pairs. It also reorders actions to ensure that Receive operations occur before the Computes that depend on them, preventing deadlocks.

Parameters:

Name Type Description Default
compute_actions dict[int, list[ActionBase]]

Initial schedule containing only compute operations.

required
stage_to_rank dict[int, int]

Mapping from stage index to rank index.

required
num_stages int

Total number of pipeline stages.

required

Returns:

Type Description
dict[int, list[ActionBase]]

A new schedule dictionary including both compute and communication actions.

Raises:

Type Description
RuntimeError

If the schedule simulation enters a deadlock state.

build_stage_to_host_rank_topology(pp_size, num_stages, style)

Constructs the mapping from stage index to rank index.

Parameters:

Name Type Description Default
pp_size int

Number of pipeline parallel ranks.

required
num_stages int

Total number of model stages.

required
style ScheduleStyle

The topology style to use for assignment.

required

Returns:

Type Description
dict[int, int]

A dictionary mapping stage IDs to Rank IDs.

Raises:

Type Description
ValueError

If the style is unknown or if V-style parameters are invalid (num_stages must be divisible by pp_size).

invert_stage_to_host_rank_topology(stage_to_host)

Inverts the topology mapping to list execution stages per rank.

Parameters:

Name Type Description Default
stage_to_host dict[int, int]

Mapping from stage index to rank index.

required

Returns:

Type Description
dict[int, list[int]]

A dictionary where keys are Rank IDs and values are lists of Stage IDs

dict[int, list[int]]

managed by that rank.

d9d.pipelining.infra.schedule.program

Pipeline Schedule Implementations

DualPipeVPipelineProgramBuilder

Bases: PipelineProgramBuilder

Builder for the DualPipeV Pipeline Parallelism schedule.

DualPipeV is a specialized bi-directional pipeline schedule designed for high throughput training. It requires exactly 2 stages per pipeline rank (V-shape) and utilizes split backward passes (Input gradients vs Weight gradients) to fill pipeline bubbles.

References

https://github.com/deepseek-ai/DualPipe https://hackmd.io/@ufotalent/r1lVXsa9Jg

__init__()

Constructs the DualPipeV builder.

Interleaved1F1BPipelineProgramBuilder

Bases: PipelineProgramBuilder

Builder for Interleaved Pipeline Parallelism schedules.

This builder supports:

  1. Standard Interleaved 1F1B: Assigns multiple stages per rank and prioritizes depth-first execution. (See https://arxiv.org/pdf/2104.04473)
  2. Interleaved Zero Bubble (ZB1P): Extends 1F1B by splitting backward passes into Input Gradients and Weight Gradients. Weight gradients are delayed to fill pipeline bubbles. (See https://arxiv.org/pdf/2401.10241)

__init__(num_stages_per_rank, enable_zero_bubble=False)

Constructs the Interleaved 1F1B builder.

Parameters:

Name Type Description Default
num_stages_per_rank int

Number of stages per rank.

required
enable_zero_bubble bool

If True, uses the ZB1P schedule variant which splits backward passes to reduce bubble size.

False

compose(num_microbatches, pp_size)

Generates the execution program for all ranks.

Parameters:

Name Type Description Default
num_microbatches int

Total microbatches. Must be divisible by the derived number of rounds.

required
pp_size int

Number of pipeline ranks.

required

Returns:

Type Description
dict[int, list[ActionBase]]

A dictionary mapping rank indices to their list of sequential actions.

LoopedBFSPipelineProgramBuilder

Bases: PipelineProgramBuilder

Builder for the Breadth-First Pipeline Parallelism schedule.

This schedule runs all available forward microbatches for local stages first. If configured for training, it then runs backwards in reverse topological order.

References

https://arxiv.org/pdf/2211.05953

__init__(num_stages_per_rank, inference_mode=False)

Constructs the LoopedBFS builder.

Parameters:

Name Type Description Default
num_stages_per_rank int

Number of stages per rank.

required
inference_mode bool

If True, only forward passes are scheduled. If False, both forward and backward passes are scheduled.

False

ZeroBubbleVPipelineProgramBuilder

Bases: PipelineProgramBuilder

Builder for the Zero Bubble V (ZBV) Pipeline Schedule.

This schedule is designed for V-shape topologies (2 stages per rank) and utilizes the Zero Bubble optimizations by splitting backward passes.

It requires exactly two stages per rank organized in a V-shape topology and splits backward passes into Input and Weight gradients to optimize pipeline throughput.

References

https://arxiv.org/pdf/2401.10241, Section 6

__init__()

Constructs the ZBV builder.

d9d.pipelining.training

PipelinedLRScheduler

Bases: LRSchedulerProtocol

Wrapper that manages multiple LR schedulers for a pipeline parallel rank.

Similar to PipelinedOptimizer, this aggregates schedulers corresponding to multiple model stages hosted on the current rank.

PipelinedOptimizer

Bases: OptimizerProtocol

Wrapper that manages multiple optimizers for a pipeline parallel rank.

In a pipeline parallel setup, a single rank might host multiple stages, each having its own parameters and optimizer. This class aggregates them into a single interface.