Skip to content

Event Bus & Hooks

Overview

Instead of hardcoding a fixed set of lifecycle methods (like on_step_start or on_post_optimizer), d9d uses a typed Event Bus for extending both the training and inference loops.

This publish-subscribe mechanism allows any user component (TrainTask, ModelProvider, etc.) to hook into specific execution points natively. You subscribe only to the events you care about, keeping your code clean and decoupled from the framework's internal execution order.

How it Works

The system revolves around three core concepts:

  1. Event[TContext]: A lightweight, typed descriptor representing a specific moment in the lifecycle.
  2. Contexts: Data Classes (e.g., EventStepContext) holding the state relevant to the event.
  3. EventBus: The central dispatcher that routes contexts to subscribed handlers.

Note: Event Bus is fail-fast: if your handler raises an exception, the loop terminates immediately.

Registering Handlers

Both BaseTask (and by extension TrainTask and InferenceTask) and ModelProvider expose a register_events hook. The framework calls this during configuration, passing an object containing the EventBus.

To provide a clean developer experience, d9d offers a @subscribe decorator. Instead of manually binding each method to the event bus, you can tag your methods and use the subscribe_annotated helper to register them all at once.

from d9d.loop.control import TrainTask, RegisterTaskEventsContext
from d9d.loop.event import (
    subscribe,
    subscribe_annotated
)
from d9d.loop.event.catalogue.train import (
    EVENT_TRAIN_MODEL_STAGES_READY,
    EVENT_TRAIN_STEP_POST,
    EventModelStagesReadyContext,
    EventStepContext,
)

class CustomTrainTask(TrainTask):
    def __init__(self):
        self._modules = []

    def register_events(self, ctx: RegisterTaskEventsContext) -> None:
        # Automatically scans this instance for @subscribe decorators
        subscribe_annotated(ctx.event_bus, self)

    @subscribe(EVENT_TRAIN_MODEL_STAGES_READY)
    def _on_model_ready(self, ctx: EventModelStagesReadyContext) -> None:
        self._modules = ctx.modules

    @subscribe(EVENT_TRAIN_STEP_POST)
    def _on_step_post(self, ctx: EventStepContext) -> None:
        for module in self._modules:
            print(f"Step {ctx.stepper.current_step} completed; MoE routing stats: {module.moe_stats}.")

    def compute_loss(self, ctx):
        ... # Task math overrides

Manual Registration

You can also interact with the EventBus directly, which is useful when registering simple lambda callbacks or dynamically creating handlers.

from d9d.loop.control import TrainTask, RegisterTaskEventsContext
from d9d.loop.event import subscribe
from d9d.loop.event.catalogue.train import (
    EVENT_TRAIN_OPTIMIZER_READY
)

class CustomTrainTask(TrainTask):
    def register_events(self, ctx: RegisterTaskEventsContext) -> None:
        ctx.event_bus.subscribe(
            EVENT_TRAIN_OPTIMIZER_READY, 
            lambda event_ctx: print(f"Optimizer loaded: {event_ctx.optimizer}")
        )

Custom Events

You can easily define and trigger your own events inside custom logic.

from d9d.loop.event import EventBus, Event
import dataclasses
from pathlib import Path

@dataclasses.dataclass(kw_only=True)
class CheckpointContext:
    step: int
    path: Path

# Define a new event
EVENT_CHECKPOINT_SAVED = Event[CheckpointContext](id="user.checkpoint_saved")

# Trigger it from somewhere in your task
def process_something(bus: EventBus):
    bus.trigger(
        EVENT_CHECKPOINT_SAVED, 
        CheckpointContext(step=1000, path=Path("/checkpoints/step_1000"))
    )

API Reference

Core Components

d9d.loop.event

Event dataclass

Bases: Generic[TContext]

Typed event descriptor.

Each event has a unique string identifier and an associated context type. Events are defined as module-level constants and used as keys for subscription and triggering.

Attributes:

Name Type Description
id str

Unique identifier for the event.

EventBus

A centralized event bus for subscribing to and triggering typed events.

This class maintains a registry of event handlers and dispatches the appropriate context to all registered callbacks when an event is triggered.

__init__()

Constructs an EventBus object.

bounded(event_pre, event_post, context)

Context manager that triggers a pre-event on entry and a post-event on exit.

Parameters:

Name Type Description Default
event_pre Event[TContext]

The event to trigger immediately before yielding.

required
event_post Event[TContext]

The event to trigger immediately after the block completes successfully.

required
context TContext

The context object passed to both events.

required

Yields:

Type Description
None

None

subscribe(event, handler)

Registers a handler function to be executed when a specific event occurs.

Parameters:

Name Type Description Default
event Event[TContext]

The event descriptor to subscribe to.

required
handler Callable[[TContext], None]

The callback function to execute when the event is triggered.

required
trigger(event, context)

Dispatches an event to all its registered handlers with the given context.

Parameters:

Name Type Description Default
event Event[TContext]

The event descriptor to trigger.

required
context TContext

The data associated with the event to pass to the handlers.

required

subscribe(event)

Decorator that tags a method to be subscribed to specific event.

This decorator does not register the method immediately. Instead, it attaches metadata to the function. To finalize registration, use subscribe_annotated().

Parameters:

Name Type Description Default
event Event[TContext]

Event descriptor to bind this method to.

required

Returns:

Type Description
Callable[[Callable[[TContext], None]], Callable[[TContext], None]]

The decorated function.

subscribe_annotated(bus, target)

Automatically subscribes all methods on the target object decorated with @subscribe.

This method uses introspection to find tagged methods and binds them to the provided event bus.

Parameters:

Name Type Description Default
bus EventBus

The EventBus instance to register the handlers to.

required
target object

The initialized class instance containing the decorated methods.

required

Common Events

d9d.loop.event.catalogue.common

EventConfigurationStartedContext dataclass

Context provided when the loop configuration process originates.

Attributes:

Name Type Description
dist_context DistributedContext

The initialized distributed execution context.

EventDataLoaderReadyContext dataclass

Context provided when the data loader has been fully initialized.

Attributes:

Name Type Description
data_loader DataLoader

The data loader instance.

EventModelStagesReadyContext dataclass

Context provided when the model stages are initialized and parallelized.

Attributes:

Name Type Description
modules list[Module]

The references to the model stages.

EventStepContext dataclass

Context providing step information during iterative execution.

Attributes:

Name Type Description
stepper Stepper

Object responsible for tracking current step and total steps.

Training Events

d9d.loop.event.catalogue.train

EVENT_TRAIN_CONFIG_STARTED = Event[EventConfigurationStartedContext](id='train.configuration.start') module-attribute

Triggered when the training configuration process begins. Provides access to the distributed context.

EVENT_TRAIN_DATA_LOADER_READY = Event[EventDataLoaderReadyContext](id='train.configuration.data_loader') module-attribute

Triggered when the training data loader has been fully initialized.

EVENT_TRAIN_FINISHED = Event[EventTrainFinishedContext](id='train.finished') module-attribute

Triggered when the entire training execution loop finishes successfully.

EVENT_TRAIN_FORWARD_BACKWARD_POST = Event[EventStepContext](id='train.forward_backward.post') module-attribute

Triggered immediately after all forward and backward passes for the current step have finished.

EVENT_TRAIN_FORWARD_BACKWARD_PRE = Event[EventStepContext](id='train.forward_backward.pre') module-attribute

Triggered immediately before the sequence of forward and backward passes begins.

EVENT_TRAIN_LR_SCHEDULER_READY = Event[EventLRSchedulerReadyContext](id='train.configuration.lr_scheduler') module-attribute

Triggered when the learning rate scheduler has been configured.

EVENT_TRAIN_MODEL_STAGES_READY = Event[EventModelStagesReadyContext](id='train.configuration.model_stages') module-attribute

Triggered when the model stages are initialized and parallelized.

EVENT_TRAIN_OPTIMIZER_READY = Event[EventOptimizerReadyContext](id='train.configuration.optimizer') module-attribute

Triggered when the optimizer has been built and is ready for use.

EVENT_TRAIN_OPTIMIZER_STEP_POST = Event[EventStepContext](id='train.optimizer_step.post') module-attribute

Triggered immediately after the optimizer has updated the model parameters but before gradients are zeroed.

EVENT_TRAIN_OPTIMIZER_STEP_PRE = Event[EventStepContext](id='train.optimizer_step.pre') module-attribute

Triggered immediately before the optimizer updates the model parameters (but after gradients are scaled/clipped).

EVENT_TRAIN_READY = Event[EventTrainReadyContext](id='train.ready') module-attribute

Triggered right before the main loop starts, after configuration is complete and checkpoints are loaded.

EVENT_TRAIN_STEP_POST = Event[EventStepContext](id='train.step.post') module-attribute

Triggered at the very end of a training step iteration, after all operations (excluding checkpointing).

EVENT_TRAIN_STEP_PRE = Event[EventStepContext](id='train.step.pre') module-attribute

Triggered at the absolute beginning of a training step iteration.

EventLRSchedulerReadyContext dataclass

Context provided when the learning rate scheduler has been instantiated.

Attributes:

Name Type Description
lr_scheduler LRSchedulerProtocol

The learning rate scheduler instance.

EventOptimizerReadyContext dataclass

Context provided when the optimizer has been instantiated.

Attributes:

Name Type Description
optimizer OptimizerProtocol

The optimizer instance wrapping the model parameters.

EventTrainFinishedContext dataclass

Context provided when the entire training loop has completed successfully.

EventTrainReadyContext dataclass

Context provided when training is fully ready to begin and the checkpoint is loaded.

Inference Events

d9d.loop.event.catalogue.inference

EVENT_INFERENCE_CONFIG_STARTED = Event[EventConfigurationStartedContext](id='inference.configuration.start') module-attribute

Triggered when the inference configuration process begins. Provides access to the distributed context.

EVENT_INFERENCE_DATA_LOADER_READY = Event[EventDataLoaderReadyContext](id='inference.configuration.data_loader') module-attribute

Triggered when the inference data loader has been fully initialized.

EVENT_INFERENCE_FINISHED = Event[EventInferenceFinishedContext](id='inference.finished') module-attribute

Triggered when the entire inference execution loop finishes successfully.

EVENT_INFERENCE_FORWARD_POST = Event[EventStepContext](id='inference.forward.post') module-attribute

Triggered immediately after the forward pass sequence for the current step has finished.

EVENT_INFERENCE_FORWARD_PRE = Event[EventStepContext](id='inference.forward.pre') module-attribute

Triggered immediately before the forward pass sequence begins.

EVENT_INFERENCE_MODEL_STAGES_READY = Event[EventModelStagesReadyContext](id='inference.configuration.model_stages') module-attribute

Triggered when the model stages are initialized for inference.

EVENT_INFERENCE_READY = Event[EventInferenceReadyContext](id='inference.ready') module-attribute

Triggered right before the main inference loop starts, after configuration is complete and checkpoints are loaded.

EVENT_INFERENCE_STEP_POST = Event[EventStepContext](id='inference.step.post') module-attribute

Triggered at the very end of an inference step iteration (excluding checkpointing).

EVENT_INFERENCE_STEP_PRE = Event[EventStepContext](id='inference.step.pre') module-attribute

Triggered at the absolute beginning of an inference step iteration.

EventInferenceFinishedContext dataclass

Context provided when the entire inference loop has completed successfully.

EventInferenceReadyContext dataclass

Context provided when inference is fully ready to begin and the checkpoint is loaded.