This section details the internals of the d9d.pipelining module. It is intended for those who wish to implement new layouts, schedules, or modify the execution engine.
Architecture
The Idea
d9d decouples the Schedule Structure from the Runtime Execution.
- You write a builder (e.g.,
1F1B,DualPipe) that generates a linear list of logicalActions(e.g.,Forward(Stage=0, MB=1),Backward(Stage=0, MB=0)). If you want, d9d may automatically injectSend/Recvactions into your compute-only schedule based on data dependencies, preventing deadlocks. - You run a dumb virtual machine simply iterates the action list and executes them.
This makes implementing complex research schedules (like Zero Bubble or DualPipeV) significantly easier than managing state machines or recursive calls.
Core Components
PipelineStage (infra/stage/stage.py)
Encapsulates a user nn.Module. It is not responsible for deciding when to run. Instead, it provides atomic pipeline stage capabilities (such as forward and backward passes) to the actions and the executor.
Consists of:
- Computation Handlers:
ForwardComputeHandler: Performs forward pass, caches inputs/outputs for backward passes.BackwardComputeHandler: Performs backward pass, capable of splitting backward passes intobackward_input(dI) andbackward_weight(dW) for advanced schedules.
- Communication Handlers: Contain and manage the P2P buffers for both forward and backward passes.
Actions (infra/schedule/component/runtime/action.py)
The atomic instructions for the pipeline virtual machine.
ForwardComputeAction: Run forward on specific microbatch.BackwardFullInputComputeAction: Run backward. Can be configured to compute gradients for inputs-only or inputs+weights.BackwardWeightComputeAction: Compute gradients for weights (used in Zero Bubble schedules).ForwardSendAction/ForwardReceiveAction/BackwardSendAction/BackwardReceiveAction: Network IO.ComposeAction: Composes multiple actions into a single one. Used for Forward/Backward overlap in schedules such as DualPipeV.
Actions are designed to be declarative and immutable.
Programs
A Program is simply dict[int, list[ActionBase]] — a mapping of Rank ID to a sequential list of Actions.
Executor (infra/schedule/component/runtime/executor.py)
The PipelineScheduleExecutor is the runtime engine.
It:
- Shards global inputs into microbatches.
- Iterates through the
Programaction list. - Dispatches calls to
Actions that perform computation or communication workload.
Comparison with PyTorch
The d9d pipelining implementation is heavily inspired by and borrows concepts from the torch.distributed.pipelining API (e.g., ZeroBubble implementation), but refactors the codebase significantly to improve clarity, type safety, and modularity.
The main architectural differences lie in the strict separation of concerns and composition over inheritance:
-
Decomposed Stage Logic:
- PyTorch: Uses a monolithic
_PipelineStageBaseclass that simultaneously manages P2P buffer allocation, gradient accumulation state, and forward/backward execution logic. - d9d: Adopts a compositional approach. The
PipelineStageclass is a thin orchestrator that delegates responsibilities to dedicated handlers.
- PyTorch: Uses a monolithic
-
Polymorphic Actions vs Enumeration:
- PyTorch: Represents schedule instructions using a single generic
_ActionNamedTuple combined with an Enum (_ComputationType.FORWARD,_ComputationType.SEND_F, etc.). - d9d: Uses a class hierarchy for actions (
ForwardComputeAction,ForwardSendAction,ComposeAction). This allows the runtime executor to use structural pattern matching (match/case) rather than largeif/elifblocks checking enums, allows different actions to carry different metadata (e.g.full_backwardflag), and improves static type checking.
- PyTorch: Represents schedule instructions using a single generic
-
Builder Pattern vs Schedule Classes:
- PyTorch: Often couples the schedule definition with the runtime object (e.g.,
Schedule1F1Bclass contains both the logic to generate the ordering and the logic to execute it). - d9d: Strictly separates the Program Builder (which generates the list of actions) from the Executor (which runs the actions). This makes it easier to inspect a schedule plan before execution or swap scheduling algorithms without changing the runtime driver.
- PyTorch: Often couples the schedule definition with the runtime object (e.g.,
Building Custom Schedules
To build a new schedule, you create a PipelineProgramBuilder.
Implement the Builder
You must implement the pipeline program builder.
from collections import defaultdict
from d9d.pipelining.infra.schedule.component.program import PipelineProgramBuilder, build_stage_to_host_rank_topology, ScheduleStyle, add_communication_ops
from d9d.pipelining.infra.schedule.component.runtime import ActionBase, ForwardComputeAction
class MyFancyScheduleBuilder(PipelineProgramBuilder):
def __init__(self, stages_per_rank: int):
self._stages_per_rank = stages_per_rank
@property
def num_stages_per_rank(self) -> int:
return self._stages_per_rank
@property
def topology_style(self) -> ScheduleStyle:
return ScheduleStyle.loop
def compose(self, num_microbatches: int, pp_size: int) -> dict[int, list[ActionBase]]:
# Map logical stages to ranks
stage_to_rank = build_stage_to_host_rank_topology(num_stages=self._stages_per_rank * pp_size,
style=ScheduleStyle.loop,
pp_size=pp_size)
actions = defaultdict(list)
# 1. Generate Compute Schedule
for rank in range(pp_size):
# ... custom logic to decide order of Fwd/Bwd ...
actions[rank].append(ForwardComputeAction(stage_idx=..., microbatch_idx=...))
# 2. Inject Communications (Magic Pass)
# This analyzes data dependencies between stages and inserts Send/Recvs
return add_communication_ops(actions, stage_to_rank, num_stages=self._stages_per_rank * pp_size)
Registering
Add your configuration to factory/config.py and register the builder in factory/factory.py.
d9d.pipelining.infra.stage
PipelineStage
Represents a single structural stage in a Pipelined Model.
This class acts as an orchestrator that combines StageCommunicationHandler (for I/O)
and Forward/BackwardComputeHandler (for execution). It abstracts away the complexity
of buffer management, distributed communication, and gradient calculation from the scheduler.
Source code in d9d/pipelining/infra/stage/stage.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 | |
__init__(info, module, group, stage_to_host_topology)
Constructs a PipelineStage object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
info
|
PipelineStageInfo
|
Metadata about the stage (index, total stages). |
required |
module
|
Module
|
The PyTorch module executed by this stage. |
required |
group
|
ProcessGroup
|
The distributed process group for pipeline communications. |
required |
stage_to_host_topology
|
dict[int, int]
|
Dict mapping stage ID to PP rank hosting it. |
required |
Source code in d9d/pipelining/infra/stage/stage.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | |
backward_one_chunk(microbatch_index, loss=None, full_backward=True)
Executes a backward pass for a single microbatch chunk.
Can perform either a full backward or just the input gradients (if full_backward=False).
It fetches required data from forward cache and communication buffers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
microbatch_index
|
int
|
The microbatch index. |
required |
loss
|
Tensor | None
|
The loss tensor (only used if this is the last stage). |
None
|
full_backward
|
bool
|
If True, computes grads for inputs and weights. If False, only for inputs. |
True
|
Source code in d9d/pipelining/infra/stage/stage.py
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 | |
backward_weight_one_chunk(microbatch_index)
Executes the weight gradient accumulation part of the backward pass.
This assumes backward_one_chunk(..., full_backward=False) was already called
for this microbatch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
microbatch_index
|
int
|
The microbatch index. |
required |
Source code in d9d/pipelining/infra/stage/stage.py
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 | |
configure_buffers(num_microbatches, has_backward, pipeline_inputs)
Initializes the communication handlers and buffers for the stage.
This must be called before execution to establish P2P buffer sizes and directions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_microbatches
|
int
|
Total number of microbatches to process. |
required |
has_backward
|
bool
|
Does this pipeline stage should store info for a backward pass |
required |
pipeline_inputs
|
dict[str, Tensor]
|
Pipeline input data. |
required |
Source code in d9d/pipelining/infra/stage/stage.py
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | |
forward_one_chunk(microbatch_index, pipeline_inputs, pipeline_kwargs=None)
Executes a forward pass for a single microbatch chunk.
Fetches inputs from the communication buffer (or pipeline_inputs if first stage),
runs the computation, and caches the result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
microbatch_index
|
int
|
The microbatch index. |
required |
pipeline_inputs
|
dict[str, Tensor]
|
Inputs provided locally (only used if this is the first stage). |
required |
pipeline_kwargs
|
dict[str, Any] | None
|
Additional arguments for the module. |
None
|
Returns:
| Type | Description |
|---|---|
|
The output tensors of the forward pass. |
Source code in d9d/pipelining/infra/stage/stage.py
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | |
get_bwd_recv_ops(microbatch_index)
Returns P2P ops to receive backward gradients for the given microbatch.
Source code in d9d/pipelining/infra/stage/stage.py
181 182 183 184 185 186 187 188 189 190 | |
get_bwd_send_ops(microbatch_index)
Returns P2P ops to send backward gradients for the given microbatch.
Source code in d9d/pipelining/infra/stage/stage.py
192 193 194 195 196 197 198 199 200 201 202 | |
get_fwd_recv_ops(microbatch_index)
Returns P2P ops to receive forward inputs for the given microbatch.
Source code in d9d/pipelining/infra/stage/stage.py
164 165 166 167 168 169 170 | |
get_fwd_send_ops(microbatch_index)
Returns P2P ops to send forward outputs for the given microbatch.
Source code in d9d/pipelining/infra/stage/stage.py
172 173 174 175 176 177 178 179 | |
pop_local_bwd_output(microbatch_index)
Retrieves local backward outputs (gradients).
Source code in d9d/pipelining/infra/stage/stage.py
141 142 143 144 145 146 147 148 149 | |
reset()
Resets the internal state of communication handlers, clearing gradients on buffers.
Source code in d9d/pipelining/infra/stage/stage.py
315 316 317 318 319 320 321 | |
set_local_bwd_input(inputs, microbatch_index)
Sets local backward inputs (output gradients) manually.
Source code in d9d/pipelining/infra/stage/stage.py
151 152 153 154 155 156 157 158 159 160 161 162 | |
set_local_fwd_input(inputs, microbatch_index)
Sets local forward inputs manually.
Used for the V-shape schedulers.
Source code in d9d/pipelining/infra/stage/stage.py
126 127 128 129 130 131 132 133 134 135 136 | |
d9d.pipelining.infra.schedule.component.runtime
Pipelining Runtime Package.
ActionBase
Bases: ABC
Abstract base class for all pipeline schedule actions.
An action represents an atomic unit of work in a pipeline schedule, such as computing a microbatch or sending/receiving a tensor.
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | |
has_backward_work
abstractmethod
property
Returns True if this action involves backward pass computations.
work_type
abstractmethod
property
Returns the classification of work this action performs.
__str__()
abstractmethod
Returns a short string representation of the action for logging/visualization.
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
79 80 81 82 | |
apply(ctx)
abstractmethod
Executes the action logic using the provided context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
ActionContext
|
The runtime context containing stages, data, and communication handlers. |
required |
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
56 57 58 59 60 61 62 63 64 65 | |
BackwardFullInputComputeAction
dataclass
Bases: ActionBase
Action to perform backward computation with respect to inputs.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage. |
microbatch_idx |
int
|
The integer index of the microbatch to compute. |
full_backward |
bool
|
If True, performs a full backward pass including inputs and weights. If False, may only compute gradients w.r.t inputs (depending on schedule implementation). |
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 | |
BackwardReceiveAction
dataclass
Bases: ActionBase
Action to schedule a backward pass gradient receive operation.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage expecting the receive operation. |
microbatch_idx |
int
|
The integer index of the microbatch being received. |
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | |
BackwardSendAction
dataclass
Bases: ActionBase
Action to schedule a backward pass gradient send operation.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage initiating the send operation. |
microbatch_idx |
int
|
The integer index of the microbatch being sent. |
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | |
BackwardWeightComputeAction
dataclass
Bases: ActionBase
Action to perform gradient accumulation on weights.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage. |
microbatch_idx |
int
|
The integer index of the microbatch to compute. |
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 | |
ComposeAction
dataclass
Bases: ActionBase
Composite action scheduling multiple sub-actions sequentially.
Used for forward/backward overlapping.
Attributes:
| Name | Type | Description |
|---|---|---|
actions |
tuple[ActionBase, ...]
|
A tuple of sub-actions to be executed sequentially. |
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 | |
ForwardComputeAction
dataclass
Bases: ActionBase
Action to perform forward computation for a specific microbatch.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage. |
microbatch_idx |
int
|
The integer index of the microbatch to compute. |
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 | |
ForwardReceiveAction
dataclass
Bases: ActionBase
Action to schedule a forward pass tensor receive operation.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage expecting the receive operation. |
microbatch_idx |
int
|
The integer index of the microbatch being received. |
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | |
ForwardSendAction
dataclass
Bases: ActionBase
Action to schedule a forward pass tensor send operation.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_idx |
int
|
The integer index of the pipeline stage initiating the send operation. |
microbatch_idx |
int
|
The integer index of the microbatch being sent. |
Source code in d9d/pipelining/infra/schedule/component/runtime/action.py
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | |
PipelineScheduleExecutor
Bases: PipelineSchedule
Executes a defined pipeline schedule by interpreting a sequence of actions.
Source code in d9d/pipelining/infra/schedule/component/runtime/executor.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | |
__init__(dist_context, stages, num_microbatches, loss_fn, program, sharding_spec)
Constructs the schedule executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dist_context
|
DistributedContext
|
The distributed context. |
required |
stages
|
list[PipelineStage]
|
List of stages managed by this executor. |
required |
num_microbatches
|
int
|
Number of microbatches the global batch is split. |
required |
loss_fn
|
LossFn | None
|
Function to compute loss. |
required |
program
|
dict[int, list[ActionBase]]
|
The execution plan mapping rank ID to a list of actions. |
required |
sharding_spec
|
PipelineShardingSpec
|
Sharding specification for input and output tensors. |
required |
Source code in d9d/pipelining/infra/schedule/component/runtime/executor.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | |
d9d.pipelining.infra.schedule.component.program
Pipeline Schedule Building Components.
This package provides the core building blocks and compiler passes used to generate execution schedules for distributed pipelines.
PipelineProgramBuilder
Bases: ABC
Abstract interface for building pipeline execution schedules.
Source code in d9d/pipelining/infra/schedule/component/program/base.py
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | |
num_stages_per_rank
abstractmethod
property
Returns the number of model stages designated for each rank.
topology_style
abstractmethod
property
Returns the topology style strategy used to assign stages to ranks.
compose(num_microbatches, pp_size)
abstractmethod
Generates the execution program for all ranks in the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_microbatches
|
int
|
Number of microbatches per step. |
required |
pp_size
|
int
|
Number of pipeline parallel ranks. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, list[ActionBase]]
|
A dictionary mapping rank indices to their list of sequential actions. |
Source code in d9d/pipelining/infra/schedule/component/program/base.py
10 11 12 13 14 15 16 17 18 19 20 21 22 | |
ScheduleStyle
Bases: StrEnum
Defines the strategy for mapping logical stages to physical ranks.
Attributes:
| Name | Type | Description |
|---|---|---|
loop |
Assigns stages in a round-robin circular fashion (mod pp_size). |
|
v |
Assigns stages in a zig-zag V-shape pattern. Useful for interleaved 1F1B schedules. |
Source code in d9d/pipelining/infra/schedule/component/program/topology.py
5 6 7 8 9 10 11 12 13 14 15 | |
add_communication_ops(compute_actions, stage_to_rank, num_stages)
Injects communication actions into a computation-only schedule.
This function iterates through the provided compute schedule and simulates execution. When a compute action produces a result needed by a different rank, it injects Send/Receive pairs. It also reorders actions to ensure that Receive operations occur before the Computes that depend on them, preventing deadlocks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
compute_actions
|
dict[int, list[ActionBase]]
|
Initial schedule containing only compute operations. |
required |
stage_to_rank
|
dict[int, int]
|
Mapping from stage index to rank index. |
required |
num_stages
|
int
|
Total number of pipeline stages. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, list[ActionBase]]
|
A new schedule dictionary including both compute and communication actions. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the schedule simulation enters a deadlock state. |
Source code in d9d/pipelining/infra/schedule/component/program/communications.py
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | |
build_stage_to_host_rank_topology(pp_size, num_stages, style)
Constructs the mapping from stage index to rank index.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pp_size
|
int
|
Number of pipeline parallel ranks. |
required |
num_stages
|
int
|
Total number of model stages. |
required |
style
|
ScheduleStyle
|
The topology style to use for assignment. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, int]
|
A dictionary mapping stage IDs to Rank IDs. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the style is unknown or if V-style parameters are invalid (num_stages must be divisible by pp_size). |
Source code in d9d/pipelining/infra/schedule/component/program/topology.py
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | |
invert_stage_to_host_rank_topology(stage_to_host)
Inverts the topology mapping to list execution stages per rank.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stage_to_host
|
dict[int, int]
|
Mapping from stage index to rank index. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, list[int]]
|
A dictionary where keys are Rank IDs and values are lists of Stage IDs |
dict[int, list[int]]
|
managed by that rank. |
Source code in d9d/pipelining/infra/schedule/component/program/topology.py
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | |
d9d.pipelining.infra.schedule.program
Pipeline Schedule Implementations
DualPipeVPipelineProgramBuilder
Bases: PipelineProgramBuilder
Builder for the DualPipeV Pipeline Parallelism schedule.
DualPipeV is a specialized bi-directional pipeline schedule designed for high throughput training. It requires exactly 2 stages per pipeline rank (V-shape) and utilizes split backward passes (Input gradients vs Weight gradients) to fill pipeline bubbles.
References
https://github.com/deepseek-ai/DualPipe https://hackmd.io/@ufotalent/r1lVXsa9Jg
Source code in d9d/pipelining/infra/schedule/program/dualpipev.py
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | |
__init__()
Constructs the DualPipeV builder.
Source code in d9d/pipelining/infra/schedule/program/dualpipev.py
32 33 34 35 | |
Interleaved1F1BPipelineProgramBuilder
Bases: PipelineProgramBuilder
Builder for Interleaved Pipeline Parallelism schedules.
This builder supports:
- Standard Interleaved 1F1B: Assigns multiple stages per rank and prioritizes depth-first execution. (See https://arxiv.org/pdf/2104.04473)
- Interleaved Zero Bubble (ZB1P): Extends 1F1B by splitting backward passes into Input Gradients and Weight Gradients. Weight gradients are delayed to fill pipeline bubbles. (See https://arxiv.org/pdf/2401.10241)
Source code in d9d/pipelining/infra/schedule/program/interleaved.py
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 | |
__init__(num_stages_per_rank, enable_zero_bubble=False)
Constructs the Interleaved 1F1B builder.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_stages_per_rank
|
int
|
Number of stages per rank. |
required |
enable_zero_bubble
|
bool
|
If True, uses the ZB1P schedule variant which splits backward passes to reduce bubble size. |
False
|
Source code in d9d/pipelining/infra/schedule/program/interleaved.py
30 31 32 33 34 35 36 37 38 39 40 | |
compose(num_microbatches, pp_size)
Generates the execution program for all ranks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_microbatches
|
int
|
Total microbatches. Must be divisible by the derived number of rounds. |
required |
pp_size
|
int
|
Number of pipeline ranks. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, list[ActionBase]]
|
A dictionary mapping rank indices to their list of sequential actions. |
Source code in d9d/pipelining/infra/schedule/program/interleaved.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | |
LoopedBFSPipelineProgramBuilder
Bases: PipelineProgramBuilder
Builder for the Breadth-First Pipeline Parallelism schedule.
This schedule runs all available forward microbatches for local stages first. If configured for training, it then runs backwards in reverse topological order.
References
https://arxiv.org/pdf/2211.05953
Source code in d9d/pipelining/infra/schedule/program/bfs.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | |
__init__(num_stages_per_rank, inference_mode=False)
Constructs the LoopedBFS builder.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_stages_per_rank
|
int
|
Number of stages per rank. |
required |
inference_mode
|
bool
|
If True, only forward passes are scheduled. If False, both forward and backward passes are scheduled. |
False
|
Source code in d9d/pipelining/infra/schedule/program/bfs.py
25 26 27 28 29 30 31 32 33 34 35 | |
ZeroBubbleVPipelineProgramBuilder
Bases: PipelineProgramBuilder
Builder for the Zero Bubble V (ZBV) Pipeline Schedule.
This schedule is designed for V-shape topologies (2 stages per rank) and utilizes the Zero Bubble optimizations by splitting backward passes.
It requires exactly two stages per rank organized in a V-shape topology and splits backward passes into Input and Weight gradients to optimize pipeline throughput.
References
https://arxiv.org/pdf/2401.10241, Section 6
Source code in d9d/pipelining/infra/schedule/program/zerobubblev.py
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | |
__init__()
Constructs the ZBV builder.
Source code in d9d/pipelining/infra/schedule/program/zerobubblev.py
30 31 | |
d9d.pipelining.training
PipelinedLRScheduler
Bases: LRSchedulerProtocol
Wrapper that manages multiple LR schedulers for a pipeline parallel rank.
Similar to PipelinedOptimizer, this aggregates schedulers corresponding to
multiple model stages hosted on the current rank.
Source code in d9d/pipelining/training/scheduler.py
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | |
PipelinedOptimizer
Bases: OptimizerProtocol
Wrapper that manages multiple optimizers for a pipeline parallel rank.
In a pipeline parallel setup, a single rank might host multiple stages, each having its own parameters and optimizer. This class aggregates them into a single interface.
Source code in d9d/pipelining/training/optimizer.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | |