Pipeline Documentation

NVIDIA recsys-examples 中 Pipeline 框架的完整技术文档。涵盖初始化、Forward 拆分、Context 生命周期和运行时 Timeline。

examples/commons/pipeline/

推荐阅读顺序

1Pipeline Init

pipeline_init.html

  • _rewrite_model 如何用 torch.fx 追踪并替换 ShardedModule
  • fill_pipeline 预填充逻辑(Base / Prefetch 对比)
  • PipelinedForward vs PrefetchPipelinedForward
  • Context 对象概览与 progress() 稳态循环
  • SWSerialTrainPipeline 11-Task DAG、_ensure_initialized

2Forward Hijack

forward_hijack.html

  • ShardedModule.forward() 如何被拆分为 input_dist + compute_and_output_dist
  • ArgInfo 参数追踪机制
  • BaseForwardPipelinedForwardPrefetchPipelinedForward 继承链
  • torch.fx.Tracer 图追踪原理
  • PipelinedPostproc 包装器

3Context Lifecycle

context_lifecycle.html

  • TrainPipelineContext 类层次图(类图 + TorchRec 类型)
  • AwaitableMultistreamableShardedModule 深度解析
  • AllToAll 两阶段模型(splits → tensors)
  • 字段生产者/消费者映射表
  • Base / Prefetch Pipeline 生命周期序列图

4Runtime Timeline

runtime_timeline.html

  • 3 次 progress() 迭代展开可视化
  • 4 条 CUDA Stream(memcpy / data_dist / prefetch / default)
  • D3.js 交互式 Timeline(缩放、平移、点击详情)
  • stream 间 wait_stream 同步箭头
  • batch 数据流追踪

5SWPipeline

sw_pipeline_overview.html

  • 声明式 Task DAG:PipelineTask + PipelinePlan
  • DeclaredIO 外部副作用声明与独立示例
  • Shortcut 缓存与 _GraftGrad 梯度嫁接
  • NCCL _SubmissionSequencer 多线程保序
  • TaskProfiler 暴露时间测量
  • 旧 Pipeline vs SWPipeline 对比

6SWPipeline API Reference

sw_pipeline_api.html

  • 完整数据类:IterContext, DeclaredIO, PipelineTask, TaskSchedule, PipelinePlan
  • intra_iter_deps / inter_iter_deps 同步机制与调度算法
  • period / iter / stage 关系公式
  • SWPipeline 所有公开方法参考
  • FusedSparseDist 端到端示例

7TorchRec Pipeline Plans

torchrec_schedules.html

  • 7 种 TorchRec Pipeline 的 PipelinePlan 定义
  • 每个 Pipeline 的 intra_iter_deps, stage, stream 分配
  • CPU 提交顺序与 stage-descending 入队
  • SparseDist, FusedSparseDist, SemiSync, Prefetch 等完整解析
  • SVG 流水线可视化

交叉引用导航

源码位置
examples/commons/pipeline/train_pipeline.py — TrainPipelineSparseDist, PrefetchTrainPipelineSparseDist
examples/commons/pipeline/utils.py — TrainPipelineContext, _rewrite_model, BaseForward, PipelinedForward
examples/commons/pipeline/sw_pipeline.py — SWPipeline, PipelineTask, PipelinePlan, DeclaredIO, TaskProfiler
examples/commons/pipeline/sw_train_pipeline.py — SWSerialTrainPipeline