设计目的 · 核心机制 · 关键特点Design Goals · Core Mechanisms · Key Features
examples/commons/pipeline/sw_pipeline.py推荐系统训练的每一次迭代包含十余个步骤——H2D 传输、embedding input dist、embedding prefetch、embedding forward、dense forward、loss、backward、gradient allreduce、optimizer step 等。现有 pipeline(如 TorchRec 的 PrefetchTrainPipelineSparseDist)将这些步骤硬编码为固定阶段。SWPipeline 要解决的是两个递进问题:
Each training iteration of a recommendation model contains a dozen steps — H2D transfer, embedding input dist, embedding prefetch, embedding forward, dense forward, loss, backward, gradient allreduce, optimizer step, etc. Existing pipelines (like TorchRec's PrefetchTrainPipelineSparseDist) hard-code these steps into fixed stages. SWPipeline addresses two progressive problems:
优化流水线的第一步是知道每个 Task 的 exposed time——即该 Task 无法被其他 Task 重叠(overlap)掉的时间。只有 exposed 部分才是真正的瓶颈。
EmbeddingForward 耗时 5ms,但其中 3ms 能与上一迭代的 H2D 重叠 → exposed = 2ms核心公式:
exposed(T) = baseline_serial − serial_with_T_shortcutted
将 Task T 替换为"瞬间完成的缓存回放",测量总迭代时间的减少量——这就是 T 的 exposed time。
The first step in optimizing a pipeline is knowing each Task's exposed time — the portion of its wall-clock time that cannot be overlapped by other Tasks. Only the exposed part is the true bottleneck.
EmbeddingForward takes 5ms, but 3ms can overlap with previous iter's H2D → exposed = 2msCore formula:
exposed(T) = baseline_serial − serial_with_T_shortcutted
Replace Task T with "instant cached replay", measure the reduction in total iteration time — that's T's exposed time.
有了 exposed time 数据后,下一步就是消灭它——通过 multi-thread、multi-stream 的方式尽可能重叠(overlap)各个 Task,让 GPU 始终有活干。
PipelinePlan,即可从 serial baseline 切换到 pipelined 模式Once we have exposed time data, the next step is to eliminate it — overlap Tasks using multi-thread, multi-stream execution to keep the GPU busy at all times.
PipelinePlan → switch from serial baseline to pipelined modeSWPipeline 将一次训练迭代分解为若干 PipelineTask,通过 PipelinePlan 描述它们的调度属性(stage、stream、thread_group)和依赖关系。在此基础上,三个关键机制使得 exposed time 测量 和 multi-thread/multi-stream 重叠 成为可能:
SWPipeline decomposes a training iteration into PipelineTasks, described by a PipelinePlan with scheduling attributes (stage, stream, thread_group) and dependencies. Three key mechanisms enable exposed time measurement and multi-thread/multi-stream overlap:
Shortcut 的核心思想:将一个 Task "替换"为瞬间完成的缓存回放,从而量化该 Task 对整体耗时的贡献(即 exposed time)。
实现分为两个层次:
ctx 属性变化,自动缓存新增/修改的属性值(递归 detach().clone() 所有 Tensor)。回放时直接恢复这些属性。ctx,还有"看不见的"外部副作用(如写入 pipeline_ctx、更新 pipelined module 内部状态)。DeclaredIO 让 Task 声明这些副作用的 capture / restore 回调,框架自动管理。Shortcut's core idea: replace a Task with instant cached replay, thereby quantifying that Task's contribution to total iteration time (exposed time).
Implementation has two layers:
ctx attributes before/after Task execution, caches added/modified values (recursive detach().clone() for all Tensors). Replays by restoring these attributes.ctx (writing pipeline_ctx, updating pipelined module state). DeclaredIO lets Tasks declare capture/restore callbacks for the framework to manage automatically.假设有三个 Task 组成一次迭代:
Suppose three Tasks compose one iteration:
# 一次迭代 = 3 个 Task 串行执行 task_a(ctx) # 数据预处理 — 2ms task_b(ctx) # embedding 计算 — 8ms ← 想知道它的 exposed time task_c(ctx) # dense 计算 — 5ms # 总耗时 baseline = 15ms # TaskProfiler 对 task_b 启用 shortcut: task_a(ctx) # 正常执行 — 2ms task_b(ctx) → CACHED # 跳过!从缓存恢复 ctx — ≈0ms task_c(ctx) # 正常执行 — 5ms # shortcutted 耗时 = 7ms # exposed(task_b) = 15ms − 7ms = 8ms (serial 中全部暴露)
ctx 的全部副作用,并通过 _GraftGrad 维持 autograd 图的连通性,使下游的 backward 能正常传播梯度。DeclaredIO 则确保 Task 对 ctx 之外共享状态的修改也被恢复。
Key property: Shortcut doesn't simply "skip" — it faithfully replays all of a Task's side effects on ctx, maintains autograd graph connectivity via _GraftGrad for proper backward propagation, and DeclaredIO ensures modifications to shared state outside ctx are also restored.
IterContext diff 能自动处理 Task 写入 ctx 的数据。但 Task 还可能修改 ctx 之外的共享状态——这些修改框架看不见。DeclaredIO 就是为了解决这个问题:
IterContext diff automatically handles data a Task writes to ctx. But Tasks may also modify shared state outside ctx — changes invisible to the framework. DeclaredIO solves this:
def task_b(ctx): ctx.embedding = compute(ctx.input) shared_state.buffer = ctx.embedding # ← 外部副作用! def task_c(ctx): x = shared_state.buffer # ← shortcut task_b 后: 过期数据! ctx.output = dense(x)
PipelineTask("task_b", task_b, io=[ DeclaredIO( capture=lambda: shared_state.buffer, restore=lambda v: setattr(shared_state, 'buffer', v), ), ]) # shortcut 时框架自动调用 restore(cached_buffer) # task_c 读到正确数据
DeclaredIO 驱动。新增 Task 只需声明 io=[],无需关心 shortcut 内部实现。详细 API 和完整独立示例见 API Reference。
Design philosophy: Task functions handle only business logic; side effect lifecycle is fully managed by the framework via DeclaredIO. New Tasks just declare io=[] — no need to know shortcut internals. See API Reference for detailed API and standalone example.
为了实现 multi-thread / multi-stream 的 Task 重叠,SWPipeline 使用两阶段同步而非传统的 stream.wait_stream()。传统方式会阻塞 CPU 线程或者造成过度同步。SWPipeline 的做法是:
threading.Event)—— 生产者 Task 完成后 set(),消费者 Task 在 CPU 端 wait() 直到信号到达。这保证消费者不会在生产者提交 kernel 之前就开始提交自己的 kernel。stream.wait_event(event))—— 生产者记录一个 CUDA event 到自己的 stream,消费者的 stream 等待该 event。这保证 GPU 端的执行顺序正确——消费者的 kernel 在 GPU 上必须等生产者的 kernel 完成后才开始执行。To achieve multi-thread / multi-stream Task overlap, SWPipeline uses two-phase synchronization instead of traditional stream.wait_stream(). The traditional approach blocks CPU threads or causes excessive synchronization. SWPipeline's approach:
threading.Event) — Producer Task calls set() when done; consumer Task calls wait() on the CPU side. Ensures consumer doesn't submit kernels before the producer has submitted its own.stream.wait_event(event)) — Producer records a CUDA event on its stream; consumer's stream waits on that event. Ensures GPU-side ordering — consumer's kernels on GPU only start after producer's kernels complete.为什么需要两阶段?单独用 CUDA Event 不够——如果 Thread 2 在 Thread 1 提交 kernel 之前就调用了 stream_B.wait_event(event_a),这时 event_a 还未被 record,行为未定义。单独用 CPU 信号也不够——CPU 信号只保证"Thread 1 提交完了",不保证"GPU 上 task_a 的 kernel 执行完了"。两阶段结合,既保证 CPU 提交顺序,又保证 GPU 执行顺序。
Why two phases? CUDA Event alone isn't enough — if Thread 2 calls stream_B.wait_event(event_a) before Thread 1 has recorded event_a, behavior is undefined. CPU signal alone isn't enough — it only guarantees "Thread 1 finished submitting", not "GPU finished executing task_a's kernels". The two phases together guarantee both CPU submission ordering and GPU execution ordering.
NCCL 集合通信(AllReduce、AllGather 等)要求所有 rank 以相同顺序提交。在单线程 pipeline 中,提交顺序天然一致。但在 multi-thread pipeline 中,不同线程上的 Task 提交顺序可能因 CPU 调度抖动而不一致——这会导致 NCCL 死锁。
_SubmissionSequencer 是 SWPipeline 的内置解决方案:
TaskSchedule 中标记 globally_ordered=Trueglobally_ordered Task 按声明顺序依次获取提交锁NCCL collectives (AllReduce, AllGather, etc.) require all ranks to submit in the same order. In a single-threaded pipeline, submission order is naturally consistent. But in multi-threaded pipelines, Tasks on different threads may submit in different orders due to CPU scheduling jitter — causing NCCL deadlocks.
_SubmissionSequencer is SWPipeline's built-in solution:
globally_ordered=True in TaskScheduleglobally_ordered Tasks acquire submission lock in declaration order across threadsexecute_orderedCore Implementation: execute_ordered# _SubmissionSequencer 核心机制: # Condition + Lock + _next_seq 计数器 class _SubmissionSequencer: def __init__(self): self._lock = threading.Lock() self._cond = threading.Condition(self._lock) self._next_seq = 0 # 下一个允许提交的序号 def execute_ordered(self, seq: int, fn: Callable) -> T: """等待轮到自己(seq == _next_seq),执行 fn,然后通知下一个。""" with self._cond: while self._next_seq != seq: self._cond.wait() # 还没轮到我,等待 result = fn() # 轮到了,执行 NCCL kernel launch self._next_seq += 1 # 允许下一个 self._cond.notify_all() # 唤醒所有等待者 return result
_submit_task 中的调用方式:每个 globally_ordered=True 的 Task 会被分配一个递增的 seq,即使它们在不同线程上执行,也必须按 seq 顺序提交 NCCL kernel。
How _submit_task calls it: each globally_ordered=True task is assigned an incrementing seq. Even when running on different threads, NCCL kernels are submitted in seq order.
Mode 3(正常执行,Task 不在 shortcut 集合中)完全零额外开销——不做 ctx diff、不缓存、不调用 DeclaredIO。Shortcut 机制只在需要时才激活(Mode 2 缓存 / Mode 1 回放),对生产训练无性能影响。
Mode 3 (normal execution, Task not in shortcut set) has zero overhead — no ctx diff, no caching, no DeclaredIO calls. Shortcut only activates when needed (Mode 2 caching / Mode 1 replay), with no performance impact on production training.
"做什么"(PipelineTask)和 "怎么调度"(PipelinePlan)完全分离。同一组 Task 函数可以在 serial baseline(pipeline_depth=1)和 pipelined(pipeline_depth=2+)之间自由切换,无需修改任何业务代码。
"What" (PipelineTask) and "how" (PipelinePlan) are fully separated. Same Task functions can freely switch between serial baseline (pipeline_depth=1) and pipelined (pipeline_depth=2+) without modifying any business code.
每个 thread_group 对应一个 worker 线程。同组 Task 串行执行,不同组 Task 可并行提交 kernel。配合 multi-stream 实现 CPU 提交和 GPU 执行的双重重叠。
Each thread_group maps to a worker thread. Tasks in the same group execute serially; different groups submit kernels in parallel. Combined with multi-stream, achieves dual overlap of CPU submission and GPU execution.
无需外部工具即可自动测量每个 Task 的 exposed time。Profiler 利用 Shortcut 机制逐个跳过 Task,对比 baseline 计算暴露时间。结果直接驱动 PipelinePlan 优化。
Automatically measures each Task's exposed time without external tools. Profiler leverages Shortcut to skip Tasks one by one, computing exposed time vs baseline. Results directly drive PipelinePlan optimization.
Shortcut 回放的 Tensor 是 detached 叶节点。_GraftGrad(自定义 autograd.Function)作为"梯度桥"——forward 返回 identity,backward 将梯度传给 restored Tensor,同时向上游传零梯度以触发上游 backward。
Shortcut-replayed Tensors are detached leaves. _GraftGrad (custom autograd.Function) acts as "gradient bridge" — forward returns identity, backward passes gradient to restored Tensor while sending zeros upstream to trigger their backward.
_cache_val / _restore_val 递归遍历 dict、list、tuple 和自定义对象(如 JaggedTensor),对所有 torch.Tensor 执行 detach().clone()。确保缓存值与原始计算图完全隔离。
_cache_val / _restore_val recursively traverse dicts, lists, tuples, and custom objects (like JaggedTensor), performing detach().clone() on all torch.Tensors. Ensures cached values are fully isolated from the original computation graph.
| 维度Dimension | PrefetchTrainPipelineSparseDist | SWPipeline |
|---|---|---|
| 设计范式Paradigm | 硬编码 4 阶段Hard-coded 4 stages | 声明式 Task DAG + PipelinePlanDeclarative Task DAG + PipelinePlan |
| 调度方式Scheduling | 固定 stream 分配Fixed stream assignment | 可配置 stage/stream/thread_groupConfigurable stage/stream/thread_group |
| Profiling | 无内置None built-in | TaskProfiler — 自动 exposed time 测量TaskProfiler — automatic exposed time measurement |
| 同步机制Sync | wait_stream | 两阶段:CPU signal + CUDA eventTwo-phase: CPU signal + CUDA event |
| 副作用管理Side Effects | N/A | DeclaredIO 声明式管理declarative |
| 多线程Threading | 单线程Single thread | 每 thread_group 一个 workerOne worker per thread_group |
| NCCL 安全NCCL Safety | 单线程天然安全Single-thread: naturally safe | _SubmissionSequencer 跨线程保序cross-thread ordering |
| Shortcut 缓存Shortcut Cache | N/A | task 级缓存 + autograd 嫁接Task-level cache + autograd grafting |
# Fixed 4-stage pipeline, hard-coded streams def __init__(self, ...): self._memcpy_stream = cuda.Stream() self._data_dist_stream = cuda.Stream() self._prefetch_stream = cuda.Stream() # Stages hard-coded in progress() # No profiling, no shortcut, no DeclaredIO
# Declarative Task DAG — what + how separated tasks = [PipelineTask("H2D", fn), ...] plan = PipelinePlan( schedule={t: TaskSchedule(stage=0, stream=s, ...)}, deps=[(t_dist, t_h2d), ...], pipeline_depth=1, # serial → change to 2 for pipelined ) pipe = SWPipeline(plan) # Same tasks, different plan → different schedule # Built-in: TaskProfiler, Shortcut, DeclaredIO
| 文件File | 关键内容Key Contents |
|---|---|
sw_pipeline.py | SWPipeline, PipelineTask, PipelinePlan, TaskSchedule, IterContext, DeclaredIO, TaskProfiler, _SubmissionSequencer |
sw_train_pipeline.py | SWSerialTrainPipeline (11 tasks, serial baseline adapter) |
utils.py | PrefetchTrainPipelineContext, SplitPrefetchPipelinedForward, _rewrite_model |
training.py | pipeline_breakdown() — TaskProfiler integration entry point |