8000 [Distributed][CI] Rework continuous TestCase by kwen2501 · Pull Request #153653 · pytorch/pytorch · GitHub
[go: up one dir, main page]

Skip to content

[Distributed][CI] Rework continuous TestCase #153653

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions test/distributed/_test_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Owner(s): ["oncall: distributed"]

from torch.testing._internal.common_distributed import MultiProcContinousTest
from torch.testing._internal.common_utils import run_tests


class TestTemplate(MultiProcContinousTest):
def testABC(self):
print(f"rank {self.rank} of {self.world_size} testing ABC")

def testDEF(self):
print(f"rank {self.rank} of {self.world_size} testing DEF")


if __name__ == "__main__":
run_tests()
29 changes: 25 additions & 4 deletions test/distributed/pipelining/model_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@


class ExampleCode(torch.nn.Module):
def __init__(self, d_hid):
def __init__(self, d_hid, splits=2):
assert splits <= 4
super().__init__()
self.splits = splits
self.mm_param0 = torch.nn.Parameter(torch.randn(d_hid, d_hid))
self.mm_param1 = torch.nn.Parameter(torch.randn(d_hid, d_hid))
self.cval = torch.nn.Buffer(torch.randn((d_hid,), requires_grad=False))
self.lin0 = torch.nn.Linear(d_hid, d_hid)
self.lin1 = torch.nn.Linear(d_hid, d_hid)
self.lin2 = torch.nn.Linear(d_hid, d_hid)

def forwa 8000 rd(self, x):
x = torch.mm(x, self.mm_param0)
Expand All @@ -24,21 +27,31 @@ def forward(self, x):
pipe_split()
x = torch.relu(x) + a_constant
x = torch.mm(x, self.mm_param1)
x = self.lin1(x)
x = torch.relu(x)
if self.splits > 2:
pipe_split()
x = self.lin1(x)
x = torch.relu(x)
if self.splits > 3:
pipe_split()
x = self.lin2(x)
x = torch.relu(x)
return x


class ModelWithKwargs(torch.nn.Module):
DEFAULT_DHID = 512
DEFAULT_BATCH_SIZE = 256

def __init__(self, d_hid: int = DEFAULT_DHID):
def __init__(self, d_hid: int = DEFAULT_DHID, splits=2):
assert splits <= 4
super().__init__()
self.splits = splits
self.mm_param0 = torch.nn.Parameter(torch.randn(d_hid, d_hid))
self.mm_param1 = torch.nn.Parameter(torch.randn(d_hid, d_hid))
self.lin0 = torch.nn.Linear(d_hid, d_hid)
self.lin1 = torch.nn.Linear(d_hid, d_hid)
self.lin2 = torch.nn.Linear(d_hid, d_hid)
self.lin3 = torch.nn.Linear(d_hid, d_hid)

def forward(self, x, y=torch.zeros(DEFAULT_BATCH_SIZE, DEFAULT_DHID)):
x = torch.mm(x, self.mm_param0)
Expand All @@ -49,6 +62,14 @@ def forward(self, x, y=torch.zeros(DEFAULT_BATCH_SIZE, DEFAULT_DHID)):
x = torch.mm(x, self.mm_param1)
x = self.lin1(x)
x = torch.relu(x)
if self.splits > 2:
pipe_split()
x = self.lin2(x)
x = torch.relu(x)
if self.splits > 3:
pipe_split()
x = self.lin3(x)
x = torch.relu(x)
return x


Expand Down
89 changes: 36 additions & 53 deletions test/distributed/pipelining/test_schedule_multiproc.py
not torch.cuda.device_count() == 2, "This test requires exactly 2 GPUs"
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
# Owner(s): ["oncall: distributed"]
import copy
import logging
import os
import sys
import tempfile

from model_registry import ModelWithKwargs, MultiMLP, MultiMLPWithDw
Expand Down Expand Up @@ -37,6 +35,7 @@
check_leaked_tensors,
instantiate_parametrized_tests,
parametrize,
run_tests,
skip_but_pass_in_sandcastle_if,
)

Expand All @@ -48,22 +47,18 @@

torch.manual_seed(0)

device_type = "cuda"


class ScheduleTest(MultiProcContinousTest):
@classmethod
def backend_str(cls) -> str:
# Testing with NCCL backend
return "nccl"

@classmethod
def setUpClass(cls):
"""
Class-scope test fixture. Run once for entire test class, before any test starts.
Set up the device.
"""
super().setUpClass()
dev_id = cls.rank % torch.cuda.device_count()
cls.device = torch.device(f"cuda:{dev_id}")
@property
def device(self) -> torch.device:
return torch.device(device_type, self.rank)

@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
Expand All @@ -77,7 +72,7 @@ def test_forward_only(self, ScheduleClass):
x = torch.randn(batch_size, d_hid, device=self.device)
x_clone = x.clone()

num_microbatches = 4
num_microbatches = 2 * self.world_size
x_mb = x.chunk(num_microbatches)[0]

# Create a pipeline
Expand Down Expand Up @@ -159,6 +154,12 @@ def test_multi_iter(self, ScheduleClass):
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@parametrize("ScheduleClass", [ScheduleGPipe, Schedule1F1B])
def test_kwargs_with_tracer(self, ScheduleClass):
# Model has two stages only, thus limiting group size to 2
group_size = 2
group = dist.new_group(list(range(group_size)))
if self.rank >= group_size:
return

mod = ModelWithKwargs(d_hid)
mod.to(self.device)

Expand All @@ -180,6 +181,7 @@ def test_kwargs_with_tracer(self, ScheduleClass):
stage = pipe.build_stage(
self.rank,
self.device,
group=group,
)

# Attach to a schedule
Expand All @@ -188,16 +190,16 @@ def test_kwargs_with_tracer(self, ScheduleClass):
# Run
if self.rank == 0:
schedule.step(x, y=y)
elif self.rank == self.world_size - 1:
elif self.rank == group_size - 1:
losses = []
out = schedule.step(target=target, losses=losses)
else:
schedule.step()

dist.barrier()
# dist.barrier()

# Last rank checks result
if self.rank == self.world_size - 1:
if self.rank == group_size - 1:
ref_out = mod(x, y=y)
ref_loss = loss_fn(ref_out, target)
pipe_loss = sum(losses)
Expand All @@ -207,9 +209,8 @@ def test_kwargs_with_tracer(self, ScheduleClass):
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@parametrize("ScheduleClass", [ScheduleGPipe, Schedule1F1B])
@parametrize("ModelClass", [MultiMLP])
def test_grad_with_tracer(self, ScheduleClass, ModelClass):
mod = ModelClass(d_hid)
def test_grad_with_tracer(self, ScheduleClass):
mod = MultiMLP(d_hid, n_layers=self.world_size)
mod.to(self.device)

ref_mod = copy.deepcopy(mod)
Expand All @@ -229,7 +230,7 @@ def test_grad_with_tracer(self, ScheduleClass, ModelClass):
ref_loss.backward()

# Create a pipeline
chunks = 4
chunks = 2 * self.world_size
x_mb = x.chunk(chunks)[0]
split_spec = mod.split_spec if hasattr(mod, "split_spec") else None
pipe = pipeline(
Expand Down Expand Up @@ -307,7 +308,7 @@ def test_grad_with_manual(self, ScheduleClass, shape_inference):
# Get a submodule, e.g. `layers.0` or `layers.1`
submod_name = f"layers.{self.rank}"
stage_module = full_mod.get_submodule(submod_name)
chunks = 4
chunks = 2 * self.world_size

if shape_inference:
input_args = None
Expand Down Expand Up @@ -410,7 +411,7 @@ def test_grad_with_manual_interleaved(self, ScheduleClass, use_new_runtime):
num_microbatches = (
ScheduleClass.num_microbatches
if hasattr(ScheduleClass, "num_microbatches")
else 8
else 2 * self.world_size
)
stages = [
PipelineStage(
Expand Down Expand Up @@ -518,13 +519,15 @@ def test_grad_with_manual_interleaved(self, ScheduleClass, use_new_runtime):
raise

@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@skip_but_pass_in_sandcastle_if(
)
@parametrize("ScheduleClass", [ScheduleWithW, ScheduleInterleavedZeroBubble])
def test_schedule_with_native_zero_bubble(self, ScheduleClass):
print(ScheduleClass)
if ScheduleClass is ScheduleInterleavedZeroBubble:
n_stages = 4
num_microbatches = 8
num_microbatches = 2 * n_stages
rank_stages = {
0: [0, 2],
1: [1, 3],
Expand Down Expand Up @@ -612,7 +615,9 @@ def test_schedule_with_native_zero_bubble(self, ScheduleClass):
raise

@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@skip_but_pass_in_sandcastle_if(
not torch.cuda.device_count() == 2, "This test requires exactly 2 GPUs"
)
@parametrize(
"ScheduleClass",
[
Expand Down Expand Up @@ -717,7 +722,9 @@ def test_pipeline_schedule_runtime_custom_sched(self, ScheduleClass):
raise

@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@skip_but_pass_in_sandcastle_if(
not torch.cuda.device_count() == 2, "This test requires exactly 2 GPUs"
)
@parametrize(
"schedule_class", [ScheduleVShaped, ScheduleUnbalanced, ScheduleZBVZeroBubble]
)
Expand Down Expand Up @@ -822,7 +829,9 @@ def test_non_symmetric_stage_ids(self, schedule_class, use_new_runtime):
raise

@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@skip_but_pass_in_sandcastle_if(
not torch.cuda.device_count() == 2, "This test requires exactly 2 GPUs"
)
@parametrize("ScheduleClass", [ScheduleInterleavedZeroBubble])
def test_schedule_with_weight_update_mlp_e2e(self, ScheduleClass):
stages_per_rank = 2
Expand Down Expand Up @@ -942,30 +951,4 @@ def dw_runner():


if __name__ == "__main__":
# Check if GPU and NCCL are available
if not (
dist.is_available()
and dist.is_nccl_available()
and torch.cuda.device_count() > 1
):
print(
"c10d NCCL not available or not enough GPUs, skipping tests",
file=sys.stderr,
)
sys.exit(0)

rank = int(os.getenv("RANK", -1))
world_size = int(os.getenv("WORLD_SIZE", 2))

if rank != -1:
# Launched with torchrun or other multi-proc launchers. Directly run the test.
ScheduleTest.run_rank(rank, world_size)
else:
# Launched as a single process. Spawn subprocess to run the tests.
# Also need a rendezvous file for `init_process_group` purpose.
rdvz_file = tempfile.NamedTemporaryFile(delete=False).name
torch.multiprocessing.spawn(
ScheduleTest.run_rank,
nprocs=world_size,
args=(world_size, rdvz_file),
)
run_tests()
Loading
Loading
0