DeepSpeed 学习 [2]: 从 0 开始 DeepSpeed 实战

发布时间 2024-01-01 22:20:06作者: Last_Whisper

从 0 开始 DeepSpeed 实战,Get Start

书接上文对 ZeRO 进行了详细的分析,但是 talk is cheap,今天开始我会陆续更新一些 DeepSpeed 框架的 Code-Level 的学习博客,后续有机会可能会涉及到 Megatron。

其实有很多 DeepSpeed 的介绍博客,但是我浏览过后发现大部分是在简单的阐述 DeepSpeed 官方的 Quick Start。我尽量从初学者的视角并且关于一些容易 confused 的地方进行一定程度的 follow up。

我发现很多 DeepSpeed 的介绍博客缺乏对 DDP 的前置介绍,这会导致后续的内容引出的比较突兀,我在此先进行一些 DDP 的铺垫。

DDP 初探

这里使用 Pytorch 中的 tutorials 来简单的介绍下 Distributed Data Parallel。我发现 Pytorch 中有多个不同的 DDP 教程,我均进行简单的介绍,具体可参考节末的 references。

Minimum DDP Example

先引用这个 minimum example,它使用 Linear 作为模型并且其中仅包含一次 forward 和 backward。这个例子中我只会重点介绍:(1)mp.spawn 和 (2)dist.init_process_group

我们从 main 函数开始,mp.spawn 是 PyTorch 的多进程包 torch.multiprocessing 中的一个函数,它用于启动多个进程,在某些情况下我们会看到使用 torch.distributed.launch 等工具来启动 DDP,而这里 mp.spawn 提供了一个更简洁的实现方式。其中 mp.spawn 的参数分别是:

  • fn: example,也就是每个进程中需要运行的函数。函数的调用方式为 fn(i,*args),其中 i 是进程索引,args 是传递的参数元组。
  • args:如上所述,是我们传递给 fn 的参数,在代码中,我们传递的是 world_size 也就是参与计算的进程总数。
  • nprocs: world_size 要启动的进程数。
  • join: True 表示主程序将等待所有进程完成。

对于 dist.init_process_group("gloo", rank=rank, world_size=world_size) 用于初始化每个进程的通信组。初始化进程组后,每个进程都会知道其他所有进程,并可以与它们进行通信。这对于分布式训练至关重要,因为它允许进程之间同步和交换数据,如模型参数、梯度等。

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
    # create default process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    # create local model
    model = nn.Linear(10, 10).to(rank)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # forward pass
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # backward pass
    loss_fn(outputs, labels).backward()
    # update parameters
    optimizer.step()
    print(f"Rank {rank}: Successfully completed training")

def main():
    world_size = 2
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)
    print("Finished")

if __name__=="__main__":
    # Environment variables which need to be
    # set when using c10d's default "env"
    # initialization mode.
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    main()

MULTI GPU TRAINING WITH DDP (Single to Multi)

默认大家至少是熟悉 Single GPU 下 Pytorch 的训练流程。下面我们将主要介绍从单卡到多卡 DDP 需要进行的代码层面上的改动。这个 tutorial 非常有意思,可以让我非常直观的理解到需要做哪些变化。

第一部分,需要添加的 Import

+ import torch.multiprocessing as mp
+ from torch.utils.data.distributed import DistributedSampler
+ from torch.nn.parallel import DistributedDataParallel as DDP
+ from torch.distributed import init_process_group, destroy_process_group
+ import os

第二步是,创建进程组。这一部分的主要内容基本在 Minimum DDP Example 中介绍完毕。只需要注意的是 torch.cuda.set_device。该函数是用来设置每个进程的默认 GPU,可以有效避免在 GPU-0 上内存使用率过高。

+ def ddp_setup(rank: int, world_size: int):
+   """
+   Args:
+       rank: Unique identifier of each process
+      world_size: Total number of processes
+   """
+   os.environ["MASTER_ADDR"] = "localhost"
+   os.environ["MASTER_PORT"] = "12355"
+   init_process_group(backend="nccl", rank=rank, world_size=world_size)
+   torch.cuda.set_device(rank)

第三步,创建 DDP 模型,这里的 gpu_id 就是 rank

- self.model = model.to(gpu_id)
+ self.model = DDP(model, device_ids=[gpu_id])

第四步,分布式环境下的数据处理。在 DDP 中,如果我们不使用 DistributedSampler 会导致潜在的数据重复问题(每个进程都在训整个数据集,不同进程间数据重复),训练不均衡和随机性控制等问题。

需要注意的是:DistributedSampler 是与 shuffle = True 互斥的。因为这代表着我们使用 sampler 来控制 Dataloader 的输出。

前面我们还提到了 DistributedSampler 的随机性问题。它保证每个 epoch 数据都会重新洗牌,但每次洗牌在所有进程中都是一致的。

Q: 一个容易想到的问题,“我们为什么要保证每次洗牌在所有进程中都是一致的呢?如果不一致会发生什么样的事情呢?” 理解这个随机性的问题可以帮助你加深对于 DistributedSampler 的理解。

A: 我目前的理解是:保持每次洗牌在所有进程中一致是因为,当数据没有进行 shuffle 时,DistributedSampler 会把每个数据集中的特定一部分分配给某个进程。所有每个进程所遇到的数据都是整个数据集中特定的,且不重叠的部分。 但是,如果数据进行了 shuffle,且我们无法保证洗牌的结果一致,会导致某些数据被多个进程重复的学习影响最终的训练结果。因此,我们需要保证数据的 shuffle 在多个进程中一致。

train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
-   shuffle=True,
+   shuffle=False,
+   sampler=DistributedSampler(train_dataset),
)

我们需要在每个 epoch 开始时使用 sampler.set_epoch() ,来保证在新的 epoch 中,每个 sampler 进行相同的洗牌。

第五步,保存模型的 checkpoints。这里需要注意的是,我们需要在 self.gpu_id == 0 (或者是 rank == 0 时)进行 checkpoints 的保存,否则我们可能会保存多份相同的 checkpoints。

- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
  self._save_checkpoint(epoch)

最后一步,运行分布式训练脚本

对于 DDP 程序,我们需要随时维护 rank 这一概念。而且 rank 会在 mp.spawn 中被自动分配(见 Minimum DDP example)。具体的 rankworld_size 的含义可以同样可以参考 Minimum DDP example。

- def main(device, total_epochs, save_every):
+ def main(rank, world_size, total_epochs, save_every):
+  ddp_setup(rank, world_size)
   dataset, model, optimizer = load_train_objs()
   train_data = prepare_dataloader(dataset, batch_size=32)
-  trainer = Trainer(model, train_data, optimizer, device, save_every)
+  trainer = Trainer(model, train_data, optimizer, rank, save_every)
   trainer.train(total_epochs)
+  destroy_process_group()

if __name__ == "__main__":
   import sys
   total_epochs = int(sys.argv[1])
   save_every = int(sys.argv[2])
-  device = 0      # shorthand for cuda:0
-  main(device, total_epochs, save_every)
+  world_size = torch.cuda.device_count()
+  mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size)

reference:

Install

在安装好 torch 后只需要使用如下命令就可以安装:

pip install deepspeed

这里可以使用 ds_report 打印出一个配置信息,可以在后续 debug 的时候使用,但是这里我们先省略。

初始化

DeepSpeed 模型训练是通过 DeepSpeed 引擎完成的。这个引擎可以包装任何类型为 torch.nn.module 的模型,并且有一套最基本的 API 用于训练和检查点保存模型。具体示例请参见相关教程。

model_engine, optimizer, _, _ = deepspeed.initialize(
	args=cmd_args,
    model=model,
    model_parameters=params,
)

DeepSpeed 的 initialize 函数确保了所有分布式数据并行或混合精度训练所需的必要设置都在幕后适当完成。除了封装模型之外,DeepSpeed 还可以根据传递给 deepspeed.initialize 的参数和 DeepSpeed 配置文件,构建并管理 training optimizer, data loader, and the learning rate scheduler请注意,DeepSpeed 会在每个训练 step 自动执行学习率调整。

注意,如果你是自定义 DataLoader 的话,你需要保证你设置的 batch_size 需要和 ds_config.json 中对齐。或者在 ds_config.json 中设置如下:


如果您已经设置了分布式环境,您将需要把以下代码:

torch.distributed.init_process_group(...)

替换成:

deepspeed.init_distributed()

DeepSpeed 默认使用 NCCL 后端,但您也可以选择覆盖这个默认设置,可以查阅文档

如果您在deepspeed.initialize()之后才需要设置分布式环境,就不必使用这个函数,因为DeepSpeed会在初始化时自动设置分布式环境。但不管怎样,如果已经设置了torch.distributed.init_process_group,就需要将其移除。

当我们说“已经设置了分布式环境”,这意味着在一个计算机集群或多台计算机之间,已经配置好了相互通信和数据共享的必要设置,从而使得这些计算机能够协同工作,共同执行分布式计算任务。在机器学习和深度学习领域,这通常涉及到在多个GPU或服务器上并行处理数据和模型。

关于“如果您在deepspeed.initialize()之后才需要设置分布式环境”的问题,这里有几个可能的场景:

  1. 灵活性和延迟初始化:在某些情况下,用户可能不确定是否需要分布式环境,或者可能希望根据运行时的条件来决定是否启用分布式环境。例如,他们可能有一个程序,既可以在单个机器上运行,也可以在分布式环境下运行。在这种情况下,延迟初始化(即在程序执行的后面阶段决定是否启用分布式环境)提供了更大的灵活性。

  2. 简化配置:对于一些使用者来说,配置和管理分布式环境可能比较复杂。如果DeepSpeed能够在初始化时自动处理分布式环境的设置,这可以简化用户的配置流程。用户不必在程序开始时立即决定或配置分布式环境,而是可以让DeepSpeed根据需要自动处理。

  3. 自动化环境检测和配置:DeepSpeed可能包含智能化的环境检测和自动配置功能。这意味着,当调用deepspeed.initialize()时,DeepSpeed能够根据当前的硬件和网络配置自动决定是否启用分布式计算,并相应地配置环境。

总之,这种灵活性允许开发者根据具体的应用场景和资源可用性来动态选择是否启用分布式环境,而不是在程序一开始就固定地做出决定。这在资源配置多变或者要适应不同的运行条件时非常有用。

Training

DeepSpeed 的 training 流畅和 Pytorch 其实差别不大,但实际上,DeepSpeed 可自动执行分布式数据并行训练(混合精度)所需的必要操作,并使用预定义的 learning rate scheduler

for step, batch in enumerate(data_loader):
    # forward() method
    loss = model_engine(batch)

    # runs backpropagation
    model_engine.backward(loss)

    # weight update
    model_engine.step()

具体操作如下:

  1. Gradient Averaging:在 DDP 中,backward 会确保在每个 training batch 结束后,梯度在 DP 处理中被平均。(这个很好理解,在前面的 ZeRO 博客中也有详细介绍)

  2. Loss Scaling:在 FP16/混合精度训练中,DeepSpeed 引擎会自动处理损失缩放,以避免在梯度中的精度损失。

    这里的 Loss Scaling 可能需要一定的介绍。Loss scaling 是一种在训练深度学习模型时使用较小数据类型(如FP16)时常用的技术,旨在防止在梯度计算中出现的数值下溢。由于使用了 FP16 等较小的数据类型,在返现传播中小梯度可能直接变为 0 了,从而无法有效的更新模型权重。Loss scaling 将先放大 Loss 进而放大梯度,再反向传播后再将梯度缩放回原来的大小,保持更多的精度信息。

    具体的实现可以参考 DeepSpeedDynamicLossScaler的实现。 实际上,DeepSpeed 中实现的核心思想为,保证数值稳定的前提下,尽可能使用最大的 loss scale。

  3. Learning Rate Scheduler:当使用 DeepSpeed 的学习率调度器(在 ds_config.json 文件中指定)时,DeepSpeed 会在每个训练步骤调用调度器的 step() 方法(当执行 model_engine.step() 时)。如果不使用 DeepSpeed 的学习率调度器的话,则主要分为以下两种情况:

    • 在每个 training step 中执行自定义的调度器,可以在 deepspeed.initialize 添加参数 lr_scheduler=,用于定制化用户需要的 scheduler
    • 或者我们需要按照某些特殊的间隔/周期进行学习率的调整,这时候我们就无须传给 DeepSpeed (他会在每个 training step 执行),而是自己手动的管理。

Model Checkpointing

如果我们需要保存或者加载 DeepSpeed 模型的 checkpointing,我们只需要使用两个函数:save_checkpointload_checkpoint 即可,他们用两个参数 ckpt_dirckpt_id 来唯一标识一个 checkpoint。

下面是 DeepSpeed 官方给出的 Example Code。

# load checkpoint
# client_sd : Dict
_, client_sd = model_engine.load_checkpoint(args.load_dir, args.ckpt_id)
step = client_sd['step']

# advance data loader to ckpt step
dataloader_to_step(data_loader, step + 1)

for step, batch in enumerate(data_loader):

    # forward() method
    loss = model_engine(batch)

    # runs backpropagation
    model_engine.backward(loss)

    # weight update
    model_engine.step()

    # save checkpoint
    if step % args.save_interval:
        client_sd['step'] = step
        ckpt_id = loss.item()
        model_engine.save_checkpoint(args.save_dir, ckpt_id, client_sd = client_sd)

这段代码 client_sd 的定义容易让人 confused。实际上,DeepSpeed 可以自动保存和恢复模型、优化器以及学习率调度器的状态,同时对用户隐藏这些细节。但是用户可能有一些其他的信息需要保存。为了支持这个功能,DeepSpeed 非常贴心的提供了对应的接口, save_checkpoint 接受一个状态字典 client_sd 用于保存,同时也能从 load_checkpoint 读取出来。

⚠️Important:所有进程都必须调用 load_checkpoint,而不仅仅是 rank 为 0 的进程(主进程)。 这是因为每个进程都需要保存其主权重,调度器和优化器状态。如果只有主进程调用该方法,那么在等待与其他进程同步时就会挂起。

DeepSpeed Configuration

DeepSpeed 的配置可以通过一个 JSON 文件实现,具体的文件名在程序中应该被标识为:args.deepspeed_config。下面将简单的介绍一个 Example,完整的特性可以参考 API doc

{
  "train_batch_size": 8,
  "gradient_accumulation_steps": 1,
  "optimizer": {
    "type": "Adam",
    "params": {
      "lr": 0.00015
    }
  },
  "fp16": {
    "enabled": true
  },
  "zero_optimization": true
}

单机多卡

最简单的 Example 实战

记得监测 memory usage