SWPipeline — Software Pipeline Framework

设计目的 · 核心机制 · 关键特点Design Goals · Core Mechanisms · Key Features

examples/commons/pipeline/sw_pipeline.py

设计目的 — 为什么需要 SWPipeline?Design Goals — Why SWPipeline?

大规模推荐模型训练中的两个核心瓶颈Two core bottlenecks in large-scale recommendation model training

推荐系统训练的每一次迭代包含十余个步骤——H2D 传输、embedding input dist、embedding prefetch、embedding forward、dense forward、loss、backward、gradient allreduce、optimizer step 等。现有 pipeline(如 TorchRec 的 PrefetchTrainPipelineSparseDist)将这些步骤硬编码为固定阶段。SWPipeline 要解决的是两个递进问题:

Each training iteration of a recommendation model contains a dozen steps — H2D transfer, embedding input dist, embedding prefetch, embedding forward, dense forward, loss, backward, gradient allreduce, optimizer step, etc. Existing pipelines (like TorchRec's PrefetchTrainPipelineSparseDist) hard-code these steps into fixed stages. SWPipeline addresses two progressive problems:

1 测量 Exposed TimeMeasure Exposed Time

优化流水线的第一步是知道每个 Task 的 exposed time——即该 Task 无法被其他 Task 重叠(overlap)掉的时间。只有 exposed 部分才是真正的瓶颈。

  • 例:EmbeddingForward 耗时 5ms,但其中 3ms 能与上一迭代的 H2D 重叠 → exposed = 2ms
  • 传统方法(nsys timeline)需要人工分析,且只能测量已有调度方案的结果
  • SWPipeline 通过内置 TaskProfiler + Shortcut 机制,自动化测量每个 Task 的 exposed time

核心公式:

exposed(T) = baseline_serial − serial_with_T_shortcutted

将 Task T 替换为"瞬间完成的缓存回放",测量总迭代时间的减少量——这就是 T 的 exposed time。

The first step in optimizing a pipeline is knowing each Task's exposed time — the portion of its wall-clock time that cannot be overlapped by other Tasks. Only the exposed part is the true bottleneck.

  • E.g.: EmbeddingForward takes 5ms, but 3ms can overlap with previous iter's H2D → exposed = 2ms
  • Traditional profiling (nsys timeline) requires manual analysis and only measures the current schedule
  • SWPipeline automates this via built-in TaskProfiler + Shortcut

Core formula:

exposed(T) = baseline_serial − serial_with_T_shortcutted

Replace Task T with "instant cached replay", measure the reduction in total iteration time — that's T's exposed time.

2 最大化 GPU 利用率Maximize GPU Utilization

有了 exposed time 数据后,下一步就是消灭它——通过 multi-thread、multi-stream 的方式尽可能重叠(overlap)各个 Task,让 GPU 始终有活干。

  • 将迭代分解为 声明式 Task DAG,每个 Task 可配置独立的 CUDA stream 和线程组
  • 跨迭代流水线化:stage 0 执行 iter i 的 compute,stage 1 同时准备 iter i+1 的数据
  • 同一组 Task 定义,只需更换 PipelinePlan,即可从 serial baseline 切换到 pipelined 模式
  • 目标:让每个 Task 的 exposed time → 0

Once we have exposed time data, the next step is to eliminate it — overlap Tasks using multi-thread, multi-stream execution to keep the GPU busy at all times.

  • Decompose iteration into declarative Task DAG, each Task configurable with its own CUDA stream and thread group
  • Cross-iteration pipelining: stage 0 runs iter i's compute, stage 1 prepares iter i+1's data simultaneously
  • Same Task definitions, different PipelinePlan → switch from serial baseline to pipelined mode
  • Goal: drive every Task's exposed time → 0
两个目标的递进关系Progressive Relationship of Two Goals
Step 1 串行 baselineSerial baseline + TaskProfiler → 获得 exposed time→ get exposed time Step 2 基于 exposed time 设计 PipelinePlanDesign PipelinePlan from exposed time 分配 stage / stream / thread_groupAssign stage / stream / thread_group → 重叠 exposed 部分→ overlap exposed parts Step 3 再次 profile 验证Profile again to verify 所有 exposed time ≈ 0?All exposed time ≈ 0? → GPU 利用率最大化→ GPU utilization maximized

核心机制 — SWPipeline 怎么做?Core Mechanisms — How Does SWPipeline Work?

三大支柱:Shortcut 缓存、两阶段同步、NCCL 提交排序Three pillars: Shortcut Caching, Two-Phase Sync, NCCL Submission Ordering

SWPipeline 将一次训练迭代分解为若干 PipelineTask,通过 PipelinePlan 描述它们的调度属性(stage、stream、thread_group)和依赖关系。在此基础上,三个关键机制使得 exposed time 测量multi-thread/multi-stream 重叠 成为可能:

SWPipeline decomposes a training iteration into PipelineTasks, described by a PipelinePlan with scheduling attributes (stage, stream, thread_group) and dependencies. Three key mechanisms enable exposed time measurement and multi-thread/multi-stream overlap:

1 Shortcut 缓存 —— Exposed Time 测量的基础Shortcut Caching —— Foundation of Exposed Time Measurement

Shortcut 的核心思想:将一个 Task "替换"为瞬间完成的缓存回放,从而量化该 Task 对整体耗时的贡献(即 exposed time)。

实现分为两个层次:

  • IterContext diff(自动)—— 框架对比 Task 执行前后的 ctx 属性变化,自动缓存新增/修改的属性值(递归 detach().clone() 所有 Tensor)。回放时直接恢复这些属性。
  • DeclaredIO(声明式)—— 许多 Task 除了写 ctx,还有"看不见的"外部副作用(如写入 pipeline_ctx、更新 pipelined module 内部状态)。DeclaredIO 让 Task 声明这些副作用的 capture / restore 回调,框架自动管理。

Shortcut's core idea: replace a Task with instant cached replay, thereby quantifying that Task's contribution to total iteration time (exposed time).

Implementation has two layers:

  • IterContext diff (automatic) — framework compares ctx attributes before/after Task execution, caches added/modified values (recursive detach().clone() for all Tensors). Replays by restoring these attributes.
  • DeclaredIO (declarative) — many Tasks have "invisible" side effects beyond ctx (writing pipeline_ctx, updating pipelined module state). DeclaredIO lets Tasks declare capture/restore callbacks for the framework to manage automatically.

EXAMPLE Shortcut 如何替换 TaskHow Shortcut Replaces a Task

假设有三个 Task 组成一次迭代:

Suppose three Tasks compose one iteration:

# 一次迭代 = 3 个 Task 串行执行
task_a(ctx)   # 数据预处理     — 2ms
task_b(ctx)   # embedding 计算  — 8ms  ← 想知道它的 exposed time
task_c(ctx)   # dense 计算      — 5ms
# 总耗时 baseline = 15ms

# TaskProfiler 对 task_b 启用 shortcut:
task_a(ctx)              # 正常执行 — 2ms
task_b(ctx) → CACHED    # 跳过!从缓存恢复 ctx — ≈0ms
task_c(ctx)              # 正常执行 — 5ms
# shortcutted 耗时 = 7ms
# exposed(task_b) = 15ms − 7ms = 8ms (serial 中全部暴露)
关键特性:Shortcut 不是简单地"跳过"——它精确还原 Task 对 ctx 的全部副作用,并通过 _GraftGrad 维持 autograd 图的连通性,使下游的 backward 能正常传播梯度。DeclaredIO 则确保 Task 对 ctx 之外共享状态的修改也被恢复。 Key property: Shortcut doesn't simply "skip" — it faithfully replays all of a Task's side effects on ctx, maintains autograd graph connectivity via _GraftGrad for proper backward propagation, and DeclaredIO ensures modifications to shared state outside ctx are also restored.

DeclaredIO 管理外部副作用Managing External Side Effects

IterContext diff 能自动处理 Task 写入 ctx 的数据。但 Task 还可能修改 ctx 之外的共享状态——这些修改框架看不见。DeclaredIO 就是为了解决这个问题:

IterContext diff automatically handles data a Task writes to ctx. But Tasks may also modify shared state outside ctx — changes invisible to the framework. DeclaredIO solves this:

没有 DeclaredIO: task_b 的副作用丢失Without DeclaredIO: side effects lost

def task_b(ctx):
    ctx.embedding = compute(ctx.input)
    shared_state.buffer = ctx.embedding  # ← 外部副作用!

def task_c(ctx):
    x = shared_state.buffer  # ← shortcut task_b 后: 过期数据!
    ctx.output = dense(x)

有 DeclaredIO: 框架自动恢复With DeclaredIO: framework auto-restores

PipelineTask("task_b", task_b, io=[
    DeclaredIO(
        capture=lambda: shared_state.buffer,
        restore=lambda v: setattr(shared_state, 'buffer', v),
    ),
])
# shortcut 时框架自动调用 restore(cached_buffer)
# task_c 读到正确数据
设计哲学:Task function 只写业务逻辑,副作用的生命周期完全由框架通过 DeclaredIO 驱动。新增 Task 只需声明 io=[],无需关心 shortcut 内部实现。详细 API 和完整独立示例见 API Reference Design philosophy: Task functions handle only business logic; side effect lifecycle is fully managed by the framework via DeclaredIO. New Tasks just declare io=[] — no need to know shortcut internals. See API Reference for detailed API and standalone example.
2 两阶段同步 —— Multi-Thread 重叠的关键Two-Phase Sync —— Key to Multi-Thread Overlap

为了实现 multi-thread / multi-stream 的 Task 重叠,SWPipeline 使用两阶段同步而非传统的 stream.wait_stream()。传统方式会阻塞 CPU 线程或者造成过度同步。SWPipeline 的做法是:

  1. CPU 信号同步threading.Event)—— 生产者 Task 完成后 set(),消费者 Task 在 CPU 端 wait() 直到信号到达。这保证消费者不会在生产者提交 kernel 之前就开始提交自己的 kernel。
  2. CUDA Event 同步stream.wait_event(event))—— 生产者记录一个 CUDA event 到自己的 stream,消费者的 stream 等待该 event。这保证 GPU 端的执行顺序正确——消费者的 kernel 在 GPU 上必须等生产者的 kernel 完成后才开始执行。

To achieve multi-thread / multi-stream Task overlap, SWPipeline uses two-phase synchronization instead of traditional stream.wait_stream(). The traditional approach blocks CPU threads or causes excessive synchronization. SWPipeline's approach:

  1. CPU signal sync (threading.Event) — Producer Task calls set() when done; consumer Task calls wait() on the CPU side. Ensures consumer doesn't submit kernels before the producer has submitted its own.
  2. CUDA Event sync (stream.wait_event(event)) — Producer records a CUDA event on its stream; consumer's stream waits on that event. Ensures GPU-side ordering — consumer's kernels on GPU only start after producer's kernels complete.

EXAMPLE task_a → task_b 的两阶段同步Two-Phase Sync: task_a → task_b

两阶段同步时序图Two-Phase Sync Timeline
task_a (thread_1, stream_A) → task_b (thread_2, stream_B)task_a (thread_1, stream_A) → task_b (thread_2, stream_B) Thread 1 (CPU) Stream A task_a: submit kernels record event_a signal.set() GPU Stream A: task_a kernels (GPU execution) Thread 2 (CPU) Stream B signal.wait() ← CPU 等待, 不提交任何 kernelsignal.wait() ← CPU blocks, no kernel submission Phase 1: CPU 信号Phase 1: CPU signal B.wait_event(a) task_b: submit kernels GPU Stream B: wait task_b kernels (GPU execution) Phase 2: CUDA eventPhase 2: CUDA event

为什么需要两阶段?单独用 CUDA Event 不够——如果 Thread 2 在 Thread 1 提交 kernel 之前就调用了 stream_B.wait_event(event_a),这时 event_a 还未被 record,行为未定义。单独用 CPU 信号也不够——CPU 信号只保证"Thread 1 提交完了",不保证"GPU 上 task_a 的 kernel 执行完了"。两阶段结合,既保证 CPU 提交顺序,又保证 GPU 执行顺序。

Why two phases? CUDA Event alone isn't enough — if Thread 2 calls stream_B.wait_event(event_a) before Thread 1 has recorded event_a, behavior is undefined. CPU signal alone isn't enough — it only guarantees "Thread 1 finished submitting", not "GPU finished executing task_a's kernels". The two phases together guarantee both CPU submission ordering and GPU execution ordering.

3 NCCL 提交排序器 —— 多线程分布式的安全保障NCCL Submission Sequencer —— Safety for Multi-Threaded Distributed

NCCL 集合通信(AllReduce、AllGather 等)要求所有 rank 以相同顺序提交。在单线程 pipeline 中,提交顺序天然一致。但在 multi-thread pipeline 中,不同线程上的 Task 提交顺序可能因 CPU 调度抖动而不一致——这会导致 NCCL 死锁

_SubmissionSequencer 是 SWPipeline 的内置解决方案:

  • 含 NCCL 通信的 Task 在 TaskSchedule 中标记 globally_ordered=True
  • Sequencer 维护一个全局计数器,确保跨线程的 globally_ordered Task 按声明顺序依次获取提交锁
  • 即使 Task A 和 Task B 在不同线程并发执行,它们的 NCCL kernel 仍然按 A → B 的顺序提交

NCCL collectives (AllReduce, AllGather, etc.) require all ranks to submit in the same order. In a single-threaded pipeline, submission order is naturally consistent. But in multi-threaded pipelines, Tasks on different threads may submit in different orders due to CPU scheduling jitter — causing NCCL deadlocks.

_SubmissionSequencer is SWPipeline's built-in solution:

  • Tasks with NCCL communication mark globally_ordered=True in TaskSchedule
  • Sequencer maintains a global counter, ensuring globally_ordered Tasks acquire submission lock in declaration order across threads
  • Even if Task A and Task B execute concurrently on different threads, their NCCL kernels still submit in A → B order
无 Sequencer vs 有 SequencerWithout vs With Sequencer
无 Sequencer → 可能死锁Without Sequencer → Potential Deadlock Rank 0: Thread-1 submits AllGather_A Rank 0: Thread-2 submits AllReduce_B Rank 1: Thread-2 submits AllReduce_B (faster) Rank 1: Thread-1 submits AllGather_A → Rank 0 等 AllGather_A,Rank 1 等 AllReduce_B→ Rank 0 waits AllGather_A, Rank 1 waits AllReduce_B → 永久死锁!→ Permanent DEADLOCK! 有 Sequencer → 顺序保证With Sequencer → Order Guaranteed Rank 0: seq.acquire(0) → AllGather_A → release Rank 0: seq.acquire(1) → AllReduce_B → release Rank 1: seq.acquire(0) → AllGather_A → release Rank 1: seq.acquire(1) → AllReduce_B → release → 所有 rank 顺序一致: A → B→ All ranks consistent order: A → B → 安全!→ Safe!

核心实现:execute_orderedCore Implementation: execute_ordered

# _SubmissionSequencer 核心机制:
# Condition + Lock + _next_seq 计数器

class _SubmissionSequencer:
    def __init__(self):
        self._lock = threading.Lock()
        self._cond = threading.Condition(self._lock)
        self._next_seq = 0           # 下一个允许提交的序号

    def execute_ordered(self, seq: int, fn: Callable) -> T:
        """等待轮到自己(seq == _next_seq),执行 fn,然后通知下一个。"""
        with self._cond:
            while self._next_seq != seq:
                self._cond.wait()     # 还没轮到我,等待
            result = fn()             # 轮到了,执行 NCCL kernel launch
            self._next_seq += 1       # 允许下一个
            self._cond.notify_all()   # 唤醒所有等待者
        return result

_submit_task 中的调用方式:每个 globally_ordered=True 的 Task 会被分配一个递增的 seq,即使它们在不同线程上执行,也必须按 seq 顺序提交 NCCL kernel。 How _submit_task calls it: each globally_ordered=True task is assigned an incrementing seq. Even when running on different threads, NCCL kernels are submitted in seq order.

关键特点Key Features

SWPipeline 相比传统 pipeline 的核心优势Core advantages of SWPipeline over traditional pipelines

Zero-Overhead ShortcutZero-Overhead Shortcut

Mode 3(正常执行,Task 不在 shortcut 集合中)完全零额外开销——不做 ctx diff、不缓存、不调用 DeclaredIO。Shortcut 机制只在需要时才激活(Mode 2 缓存 / Mode 1 回放),对生产训练无性能影响。

Mode 3 (normal execution, Task not in shortcut set) has zero overhead — no ctx diff, no caching, no DeclaredIO calls. Shortcut only activates when needed (Mode 2 caching / Mode 1 replay), with no performance impact on production training.

声明式 Task DAGDeclarative Task DAG

"做什么"PipelineTask)和 "怎么调度"PipelinePlan)完全分离。同一组 Task 函数可以在 serial baseline(pipeline_depth=1)和 pipelined(pipeline_depth=2+)之间自由切换,无需修改任何业务代码。

"What" (PipelineTask) and "how" (PipelinePlan) are fully separated. Same Task functions can freely switch between serial baseline (pipeline_depth=1) and pipelined (pipeline_depth=2+) without modifying any business code.

Multi-Threaded 执行Multi-Threaded Execution

每个 thread_group 对应一个 worker 线程。同组 Task 串行执行,不同组 Task 可并行提交 kernel。配合 multi-stream 实现 CPU 提交和 GPU 执行的双重重叠。

Each thread_group maps to a worker thread. Tasks in the same group execute serially; different groups submit kernels in parallel. Combined with multi-stream, achieves dual overlap of CPU submission and GPU execution.

内置 TaskProfilerBuilt-in TaskProfiler

无需外部工具即可自动测量每个 Task 的 exposed time。Profiler 利用 Shortcut 机制逐个跳过 Task,对比 baseline 计算暴露时间。结果直接驱动 PipelinePlan 优化。

Automatically measures each Task's exposed time without external tools. Profiler leverages Shortcut to skip Tasks one by one, computing exposed time vs baseline. Results directly drive PipelinePlan optimization.

Autograd 梯度嫁接Autograd Gradient Grafting

Shortcut 回放的 Tensor 是 detached 叶节点。_GraftGrad(自定义 autograd.Function)作为"梯度桥"——forward 返回 identity,backward 将梯度传给 restored Tensor,同时向上游传零梯度以触发上游 backward。

Shortcut-replayed Tensors are detached leaves. _GraftGrad (custom autograd.Function) acts as "gradient bridge" — forward returns identity, backward passes gradient to restored Tensor while sending zeros upstream to trigger their backward.

递归 Tensor 缓存Recursive Tensor Caching

_cache_val / _restore_val 递归遍历 dict、list、tuple 和自定义对象(如 JaggedTensor),对所有 torch.Tensor 执行 detach().clone()。确保缓存值与原始计算图完全隔离。

_cache_val / _restore_val recursively traverse dicts, lists, tuples, and custom objects (like JaggedTensor), performing detach().clone() on all torch.Tensors. Ensures cached values are fully isolated from the original computation graph.

完整 API 文档:所有数据结构定义、方法签名、参数说明、代码示例和内部机制详解均在独立的 API 参考页面。 Complete API docs: All data structure definitions, method signatures, parameter descriptions, code examples, and internal mechanism details are in the dedicated API reference page.

→ 查看 SWPipeline API Reference → View SWPipeline API Reference

对比 — 旧 Pipeline vs SWPipelineComparison — Old Pipeline vs SWPipeline

从硬编码到声明式From hard-coded to declarative

维度DimensionPrefetchTrainPipelineSparseDistSWPipeline
设计范式Paradigm硬编码 4 阶段Hard-coded 4 stages声明式 Task DAG + PipelinePlanDeclarative Task DAG + PipelinePlan
调度方式Scheduling固定 stream 分配Fixed stream assignment可配置 stage/stream/thread_groupConfigurable stage/stream/thread_group
Profiling无内置None built-inTaskProfiler — 自动 exposed time 测量TaskProfiler — automatic exposed time measurement
同步机制Syncwait_stream两阶段:CPU signal + CUDA eventTwo-phase: CPU signal + CUDA event
副作用管理Side EffectsN/ADeclaredIO 声明式管理declarative
多线程Threading单线程Single thread每 thread_group 一个 workerOne worker per thread_group
NCCL 安全NCCL Safety单线程天然安全Single-thread: naturally safe_SubmissionSequencer 跨线程保序cross-thread ordering
Shortcut 缓存Shortcut CacheN/Atask 级缓存 + autograd 嫁接Task-level cache + autograd grafting

Old: PrefetchTrainPipelineSparseDist

# Fixed 4-stage pipeline, hard-coded streams
def __init__(self, ...):
    self._memcpy_stream = cuda.Stream()
    self._data_dist_stream = cuda.Stream()
    self._prefetch_stream = cuda.Stream()
    # Stages hard-coded in progress()
    # No profiling, no shortcut, no DeclaredIO

New: SWPipeline

# Declarative Task DAG — what + how separated
tasks = [PipelineTask("H2D", fn), ...]
plan = PipelinePlan(
    schedule={t: TaskSchedule(stage=0, stream=s, ...)},
    deps=[(t_dist, t_h2d), ...],
    pipeline_depth=1,  # serial → change to 2 for pipelined
)
pipe = SWPipeline(plan)
# Same tasks, different plan → different schedule
# Built-in: TaskProfiler, Shortcut, DeclaredIO

源码文件索引Source File Reference

文件File关键内容Key Contents
sw_pipeline.pySWPipeline, PipelineTask, PipelinePlan, TaskSchedule, IterContext, DeclaredIO, TaskProfiler, _SubmissionSequencer
sw_train_pipeline.pySWSerialTrainPipeline (11 tasks, serial baseline adapter)
utils.pyPrefetchTrainPipelineContext, SplitPrefetchPipelinedForward, _rewrite_model
training.pypipeline_breakdown() — TaskProfiler integration entry point
目录导航Contents