Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Distribution] Support DualPipeV #71427

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from

Conversation

zhangyuqin1998
Copy link
Contributor

@zhangyuqin1998 zhangyuqin1998 commented Mar 5, 2025

PR Category

Distributed Strategy

PR Types

New features

Description

An implementation of the DeepSeek-V3 DualPipeV, based on https://github.com/deepseek-ai/DualPipe/blob/main/dualpipe/dualpipev.py

image

For the pipeline schedule

Usage:
set use_dualpipev=True for both your PipelineLayer and the strategy.hybrid_configs

The following codes can be run using python -m paddle.distributed.launch --gpus="0,1,2,3" demo.py

import random
import numpy as np
import paddle
import paddle.nn as nn
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_parallel import LayerDesc, PipelineLayer
from paddle.io import Dataset, DataLoader

# Constants
BATCH_NUM = 20
BATCH_SIZE = 80
MICRO_BATCH_SIZE = 2
SEQ_LEN = 1024
HIDDEN_SIZE = 2048

# Dataset class
class RandomDataset(Dataset):
    def __init__(self, num_samples):
        self.num_samples = num_samples

    def __getitem__(self, idx):
        image = np.random.random([SEQ_LEN, HIDDEN_SIZE]).astype('float32')
        label = np.random.randint(0, 10, (SEQ_LEN)).astype('int64')
        return image, label

    def __len__(self):
        return self.num_samples

class LinearPipe(nn.Linear):
    def forward(self, input):
        if isinstance(input, list):
            input = input[0]
        return paddle.matmul(input, self.weight)
    
class CrossEntropyLossPipe(nn.CrossEntropyLoss):
    def forward(self, logits, label):
        if isinstance(logits, list):
            logits = logits[0]
        return super().forward(logits, label)

# Pipeline description class
class SimplePipeDesc(PipelineLayer):
    def __init__(self, **kwargs):
        descs = [LayerDesc(LinearPipe, HIDDEN_SIZE, HIDDEN_SIZE) for _ in range(8)]
        super(SimplePipeDesc, self).__init__(
            layers=descs, loss_fn=CrossEntropyLossPipe(), **kwargs
        )

# Main function
if __name__ == "__main__":
    # Distributed strategy configuration
    strategy = fleet.DistributedStrategy()
    pipeline_parallel_size = 4
    strategy.hybrid_configs = {
        "pp_degree": pipeline_parallel_size
    }
    strategy.pipeline_configs = {
        "accumulate_steps": BATCH_SIZE // MICRO_BATCH_SIZE,
        "micro_batch_size": MICRO_BATCH_SIZE
    }
    strategy.hybrid_configs["pp_configs"].use_dualpipev = True

    # Initialize fleet
    fleet.init(is_collective=True, strategy=strategy)

    # Model and optimizer setup
    model = SimplePipeDesc(
        num_stages=pipeline_parallel_size,
        topology=fleet.get_hybrid_communicate_group()._topo,
        use_dualpipev=True
    )
    model = fleet.distributed_model(model)

    scheduler = paddle.optimizer.lr.PiecewiseDecay(
        boundaries=[2], values=[0.001, 0.002], verbose=False
    )
    optimizer = paddle.optimizer.SGD(
        learning_rate=scheduler, parameters=model.parameters()
    )
    optimizer = fleet.distributed_optimizer(optimizer)

    # Data loader setup
    dataset = RandomDataset(BATCH_NUM * BATCH_SIZE)
    train_reader = DataLoader(
        dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        drop_last=True,
        num_workers=2
    )

    # Training loop
    for i, (input, label) in enumerate(train_reader()):
        if i >= 5:
            break
        loss = model.train_batch([input, label], optimizer, scheduler)
        print("pp_loss:", loss.numpy())

For the SplitBW Linear

SplitBW Linear is used for zero bubble pipeline proposed in https://arxiv.org/abs/2401.10241

Use paddle.distributed.fleet.meta_parallel.zero_bubble_utils.SplitBWLinear to replace the standard nn.Linear. Notably, SplitBWLinear can only be used in DualPipeV; otherwise, users need to manage the WeightGradStore themselves to ensure that all weight gradients are calculated.
image

Pcard-76459

Copy link

paddle-bot bot commented Mar 5, 2025

你的PR提交成功,感谢你对开源项目的贡献!
请关注后续CI自动化测试结果,详情请参考Paddle-CI手册
Your PR has been submitted. Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant