8000 [DTensor] Scalar multiplication after reduction doesn't update result without calling .full_tensor() before · Issue #153603 · pytorch/pytorch · GitHub
[go: up one dir, main page]

Skip to content

[DTensor] Scalar multiplication after reduction doesn't update result without calling .full_tensor() before #153603

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Frankstein73 opened this issue May 15, 2025 · 2 comments
Labels
oncall: distributed Add this issue/PR to distributed oncall triage queue

Comments

@Frankstein73
Copy link
Frankstein73 commented May 15, 2025

🐛 Describe the bug

When a DTensor undergoes a reduction (e.g., using .mean() or .sum()), and the resulting DTensor is then multiplied by a scalar multiplier, the calculation yields an incorrect result if .full_tensor() is not called on the reduced DTensor before the multiplication. It appears that the result of the first such multiplication is cached and incorrectly reused in subsequent multiplications with different scalars. This issue does not occur if the multiplier is a tensor instead of a scalar.

import torch
import torch.distributed as dist
from torch.distributed.tensor import (
    DTensor,
    Replicate,
    Shard,
    init_device_mesh,
)
import os

# When the multiplier is a scalar, the result is incorrect and cached if .full_tensor() is not called before the multiplication.
def test_dtensor_reduction_scalar_mul() -> None:
    device_mesh = init_device_mesh("cuda", [int(os.environ["WORLD_SIZE"])])
    for i in range(1, 4):
        x = torch.tensor([1.0], dtype=torch.float32, device="cuda")
        dx = DTensor.from_local(x, device_mesh, [Shard(0)])
        dy = dx.mean() # or sum()
        multiplier = i
        # The calculation result remains as the result of the first calculation(in this case, DTensor(local_tensor=1.0)) and won't be updated in the following calculations.
        # It seems that the result of the first calculation is cached.
        print(f"rank: {dist.get_rank()}, before full_tensor: {dy * multiplier}") # The result is [DTensor(local_tensor=1.0), DTensor(local_tensor=1.0), DTensor(local_tensor=1.0)] for all calculations.
        dy = dy.full_tensor()
        # If call .full_tensor() before the multiplication, the calculation result is correct and will be updated in the following calculations.
        print(f"rank: {dist.get_rank()}, after full_tensor: {dy * multiplier}") # The result is [Tensor([1.0]), Tensor([2.0]), Tensor([3.0])].


# When the multiplier is a tensor, the result is correct whether call .full_tensor() or not.
def test_dtensor_reduction_tensor_mul() -> None:
    device_mesh = init_device_mesh("cuda", [int(os.environ["WORLD_SIZE"])])
    for i in range(1, 4):
        x = torch.tensor([1.0], dtype=torch.float32, device="cuda")
        dx = DTensor.from_local(x, device_mesh, [Shard(0)])
        dy = dx.mean() # or sum()
        multiplier = torch.tensor(i, device="cuda")
        print(f"rank: {dist.get_rank()}, before full_tensor: {dy * multiplier}") # The result is [DTensor(local_tensor=1.0), DTensor(local_tensor=2.0), DTensor(local_tensor=3.0)].
        dy = dy.full_tensor()
        print(f"rank: {dist.get_rank()}, after full_tensor: {dy * multiplier}") # The result is [Tensor([1.0]), Tensor([2.0]), Tensor([3.0])].

if __name__ == "__main__":
    test_dtensor_reduction_scalar_mul()
    test_dtensor_reduction_tensor_mul()

Versions

PyTorch version: 2.6.0+cu124
Is debug build: False
CUDA used to build PyTorch: 12.4
ROCM used to build PyTorch: N/A

OS: Ubuntu 22.04.4 LTS (x86_64)
GCC version: (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
Clang version: Could not collect
CMake version: version 3.22.1
Libc version: glibc-2.35

Python version: 3.11.11 (main, Dec  6 2024, 20:02:44) [Clang 18.1.8 ] (64-bit runtime)
Python platform: Linux-5.4.0-144-generic-x86_64-with-glibc2.35
Is CUDA available: False
CUDA runtime version: Could not collect
CUDA_MODULE_LOADING set to: N/A
GPU models and configuration: 
Nvidia driver version: Could not collect
cuDNN version: Could not collect
HIP runtime version: N/A
MIOpen runtime version: N/A
Is XNNPACK available: True

CPU:
Architecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Address sizes:                   46 bits physical, 57 bits virtual
Byte Order:                      Little Endian
CPU(s):                          112
On-line CPU(s) list:             0-111
Vendor ID:                       GenuineIntel
Model name:                      Intel(R) Xeon(R) Gold 6330 CPU @ 2.00GHz
CPU family:                      6
Model:                           106
Thread(s) per core:              2
Core(s) per socket:              28
Socket(s):                       2
Stepping:                        6
BogoMIPS:                        4000.00
Flags:                           fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid dca sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault epb cat_l3 invpcid_single ssbd mba ibrs ibpb stibp ibrs_enhanced tpr_shadow vnmi flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid cqm rdt_a avx512f avx512dq rdseed adx smap avx512ifma clflushopt clwb intel_pt avx512cd sha_ni avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local wbnoinvd dtherm ida arat pln pts avx512vbmi umip pku ospke avx512_vbmi2 gfni vaes vpclmulqdq avx512_vnni avx512_bitalg tme avx512_vpopcntdq rdpid md_clear pconfig flush_l1d arch_capabilities
Virtualization:                  VT-x
L1d cache:                       2.6 MiB (56 instances)
L1i cache:                       1.8 MiB (56 instances)
L2 cache:                        70 MiB (56 instances)
L3 cache:                        84 MiB (2 instances)
NUMA node(s):                    2
NUMA node0 CPU(s):               0-27,56-83
NUMA node1 CPU(s):               28-55,84-111
Vulnerability Itlb multihit:     Not affected
Vulnerability L1tf:              Not affected
Vulnerability Mds:               Not affected
Vulnerability Meltdown:          Not affected
Vulnerability Mmio stale data:   Mitigation; Clear CPU buffers; SMT vulnerable
Vulnerability Retbleed:          Not affected
Vulnerability Spec store bypass: Mitigation; Speculative Store Bypass disabled via prctl and seccomp
Vulnerability Spectre v1:        Mitigation; usercopy/swapgs barriers and __user pointer sanitization
Vulnerability Spectre v2:        Mitigation; Enhanced IBRS, IBPB conditional, RSB filling, PBRSB-eIBRS SW sequence
Vulnerability Srbds:             Not affected
Vulnerability Tsx async abort:   Not affected

Versions of relevant libraries:
[pip3] Could not collect
[conda] Could not collect

cc @H-Huang @awgu @wanchaol @fegin @fduwjj @wz337 @wconstab @d4l3k

@bigachin
Copy link

Can I take up this issue?

@arkadip-maitra
Copy link
arkadip-maitra commented May 15, 2025

This seems to stem from how DTensor handles lazy evaluation and possibly internal caching or optimization paths for scalar operations.

Python scalar multiplication uses the __rmul__ or __mul__ method on the DTensor object.

If the DTensor is in a reduced state and not not fully resolved to a real tensor via .full_tensor(), PyTorch may defer or incorrectly cache the operation assuming it’s stateless or idempotent.

In contrast, multiplying by a tensor invokes a different code path (likely more eager or bypassing the faulty caching), which avoids the issue.

Until it is fixed calling .full_tensor() on a reduced DTensor before applying scalar operations like * scalar.

Potential fix would be to change the dispatch function in tensor._dispatch.OpDispatcher class.

Adding a hook before local computation (right after op_info is created) to check for Partial placements and perform the necessary collective reduction.

✅ Insert after: op_info = self.unwrap_to_op_info(op_call, args, kwargs)

creating a new function like this:

def resolve_partial(dt):
    if isinstance(dt, dtensor.DTensor) and any(isinstance(p, Partial) for p in dt.placements):
        return dt.redistribute(placements=[Replicate()] * dt.device_mesh.ndim)
    return dt

args = pytree.tree_map(resolve_partial, args)
kwargs = pytree.tree_map(resolve_partial, kwargs)

ensures that:

  • All Partial DTensors are replaced with Replicate()-based DTensors
  • This triggers the required all_reduce and avoids stale/cached values
  • Scalar ops now act on fully resolved values

@jcaip jcaip added the oncall: distributed Add this issue/PR to distributed oncall triage queue label May 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
oncall: distributed Add this issue/PR to distributed oncall triage queue
Projects
None yet
Development

No branches or pull requests

4 participants
0