8000 [Feature][c10d] Allow flexible `cudaStreamWaitEvent` in PGNCCL · Issue #146881 · pytorch/pytorch · GitHub
[go: up one dir, main page]

Skip to content

[Feature][c10d] Allow flexible cudaStreamWaitEvent in PGNCCL #146881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
awgu opened this issue Feb 11, 2025 · 5 comments
Closed

[Feature][c10d] Allow flexible cudaStreamWaitEvent in PGNCCL #146881

awgu opened this issue Feb 11, 2025 · 5 comments
Labels
oncall: distributed Add this issue/PR to distributed oncall triage queue

Comments

@awgu
Copy link
Collaborator
awgu commented Feb 11, 2025

🚀 The feature, motivation and pitch

Today, PGNCCL collectives have the internal NCCL stream wait on the current stream before issuing the NCCL kernel in the NCCL stream. However, waiting on the current stream can be over-synchronization and waiting on an earlier CUDA event could suffice for correctness.

I have a specific use case where it could be useful to pass in a user-specified CUDA event to a PGNCCL collective (in my case, P2P send and recv) and have the collective wait on that instead of recording a new event and waiting on that at collective call time.

The current design mostly assumes that the collective will be called immediately after the relevant kernels needed for that collective; however, when we have multidimensional parallelism, this may not always be the case anymore. There could be several collectives being issued for different PGs, possibly requiring non-collective ops (e.g. device copies) only related to those collectives -- we do not want later collectives to wait on those non-collective ops that have no relevance to them.

One possible API would be to add an event: Optional[torch.Event] arg to each c10d collective (though I am not sure how to rationalize this across non-NCCL backends), where if the user passes an event, then the PGNCCL impl should wait on that event instead of doing a stream sync with the current stream.

Alternatives

We can manually reorder the collectives to avoid over-synchronization, but it may be intrusive to the different parallelism components.

Additional context

No response

cc @H-Huang @kwen2501 @wanchaol @fegin @fduwjj @wz337 @wconstab @d4l3k @c-p-i-o

@awgu awgu added the oncall: distributed Add this issue/PR to distributed oncall triage queue label Feb 11, 2025
@wconstab
Copy link
Contributor

I think this proposal looks feasible, but i'm not sure I follow the use case. It sounds like you have the following, and you want collective to start right after 'some_op' because it has no data dependency on other_ops.

t = some_op()
other_ops()
collective(t)

Whats less clear to me is why is preferable to fix this by adding an event record after some_op, and passing that event around to the code that launches collective, rather than just moving collective, but perhaps it is more convenient that way, or something?

@awgu
Copy link
Collaborator Author
awgu commented Feb 11, 2025

I think that at a high level, the challenge is that DP gradient reduction may be implemented using backward hooks, while PP send/recv may be implemented outside of autograd, making it difficult to reorder PP send/recv into a backward.

@wconstab
Copy link
Contributor

So for example's sake would you be exposing a global 'event' object from pipelining code and then accessing the event from inside the hook that issues DP comms, or something? Just trying to see the full picture and ask whether it is actually cleaner than to just expose something like a method from pp that lets the DP hook call to issue pp comms.

Also, I'm not really opposed to adding this feature. I think it would be pretty easy to document how it works. Cc @kwen2501 wdyt?

@awgu
Copy link
Collaborator Author
awgu commented Feb 25, 2025

Thanks @ngimel for pointing this out.

See the previous_event in DeepEP APIs:

def dispatch_forward(x: Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]],
                     topk_idx: torch.Tensor, topk_weights: torch.Tensor,
                     num_experts: int, previous_event: Optional[EventOverlap] = None) -> \
        Tuple[Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]], torch.Tensor, torch.Tensor, List, Tuple, EventOverlap]:
    # NOTES: an optional `previous_event` means a CUDA event captured that you want to make it as a dependency 
    # of the dispatch kernel, it may be useful with communication-computation overlap. For more information, please
    # refer to the docs of `Buffer.dispatch`

@kwen2501
Copy link
Contributor

Hi @awgu there are two user-side examples that might achieve similar goal:
#147729 (comment)
#147729 (comment)

I wonder if they would suffice here too?

kwen2501 added a commit that referenced this issue Mar 6, 2025
…stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately have to be atomic):
1. When async_op=False, we directly launch the collective on "current" stream instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves an event sync and one pybind during the unnecessary `work.wait()` called by distributed_c10d.py.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 6, 2025
…stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately have to be atomic):
1. When async_op=False, we directly launch the collective on "current" stream instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves an event sync and one pybind during the unnecessary `work.wait()` called by distributed_c10d.py.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 6, 2025
…stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately have to be atomic):
1. When async_op=False, we directly launch the collective on "current" stream instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves an event sync and one pybind during the unnecessary `work.wait()` called by distributed_c10d.py.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 7, 2025
… remove `record_stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately have to be atomic):
1. When async_op=False, we directly launch the collective on "current" stream instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves an event sync and one pybind during the unnecessary `work.wait()` called by distributed_c10d.py.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 7, 2025
…stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately have to be atomic):
1. When async_op=False, we directly launch the collective on "current" stream instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves an event sync and one pybind during the unnecessary `work.wait()` called by distributed_c10d.py.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 7, 2025
… remove `record_stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately have to be atomic):
1. When async_op=False, we directly launch the collective on "current" stream instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves an event sync and one pybind during the unnecessary `work.wait()` called by distributed_c10d.py.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 7, 2025
…stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately have to be atomic):
1. When async_op=False, we directly launch the collective on "current" stream instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves an event sync and one pybind during the unnecessary `work.wait()` called by distributed_c10d.py.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 8, 2025
… remove `record_stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 8, 2025
…stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the sam
8000
e line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 8, 2025
… remove `record_stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 8, 2025
…stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 9, 2025
… remove `record_stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

Differential Revision: [D70835197](https://our.internmc.facebook.com/intern/diff/D70835197)

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 9, 2025
…stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

Differential Revision: [D70835197](https://our.internmc.facebook.com/intern/diff/D70835197)

[ghstack-poisoned]
pytorchmergebot pushed a commit that referenced this issue Mar 9, 2025
…irely (#148590)

This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back.
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with @cenzhaometa who wants to remove the event sync overhead.

Cc: @ngimel @awgu @Aidyn-A @skyw @wconstab @leonardo0lyj

Differential Revision: [D70835197](https://our.internmc.facebook.com/intern/diff/D70835197)
Pull Request resolved: #148590
Approved by: https://github.com/eqy, https://github.com/Aidyn-A, https://github.com/fduwjj
pytorchmergebot pushed a commit that referenced this issue Mar 11, 2025
…irely (#148590)

This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back.
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with @cenzhaometa who wants to remove the event sync overhead.

Cc: @ngimel @awgu @Aidyn-A @skyw @wconstab @leonardo0lyj

Differential Revision: [D70937982](https://our.internmc.facebook.com/intern/diff/D70937982)
Pull Request resolved: #148590
Approved by: https://github.com/eqy, https://github.com/Aidyn-A, https://github.com/fduwjj
kwen2501 added a commit that referenced this issue Mar 21, 2025
… remove `record_stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 21, 2025
…stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 28, 2025
… remove `record_stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

Differential Revision: [D71652868](https://our.internmc.facebook.com/intern/diff/D71652868)

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Mar 28, 2025
…stream` entirely"


This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back. 
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Cc: ngimel awgu Aidyn-A skyw wconstab leonardo0lyj

cc H-Huang awgu wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

Differential Revision: [D71652868](https://our.internmc.facebook.com/intern/diff/D71652868)

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Apr 1, 2025
…irely

Relanding #148590 due to merge conflict.

This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back.
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with @cenzhaometa who wants to remove the event sync overhead.

Squashed contents:

* [ptd][nccl] use current-stream as nccl-stream under async=False mode (#147820)
PTD current workflow:
- PTD creates its own dedicated `ncclStream` for comm operation
- it will first add a dependency on current-stream (typically the compute stream) to ensure tensors are ready before invoking collective
such stream synchronization become expensive in Inference world (cpu overhead: 70us vs GPU kernel time: 160us).
This diff:
- async=False [default], will use current-stream as nccl-stream and avoid the stream-sync overhead
- async=True, will retain existing logic: create new nccl-stream, let it wait on current-stream to ensure tensors are ready
- pass down async from c10d down to NCCL-PG
this helps shave off 50% CPU overhead **(70us -> 35us)**, which reduce total CPU/GPU from **230us to 195us by 15%**
Differential Revision: D70135605

* [PGNCCL] Make avoid-record-stream default

* [c10d] Add asyncOp argument to Ops

* Change python side wait

* Pass asyncOp at ProcessGroup level

* Watchdog unstashing tensors as a safety net

* Stash tensors for reduce_scatter_v and all_gather_v
Pull Request approved: #149753

* [c10d] Move unstashing from watchdog to main thread
Pull Request approved: #150079

* [PGNCCL][BE] Merge mutex into TensorShelf for encapsulation
Pull Request approved: #150130

[ghstack-poisoned]
kwen2501 added a commit that referenced this issue Apr 1, 2025
…irely

Relanding #148590 due to merge conflict.

This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back.
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with cenzhaometa who wants to remove the event sync overhead.

Squashed contents:

* [ptd][nccl] use current-stream as nccl-stream under async=False mode (#147820)
PTD current workflow:
- PTD creates its own dedicated `ncclStream` for comm operation
- it will first add a dependency on current-stream (typically the compute stream) to ensure tensors are ready before invoking collective
such stream synchronization become expensive in Inference world (cpu overhead: 70us vs GPU kernel time: 160us).
This diff:
- async=False [default], will use current-stream as nccl-stream and avoid the stream-sync overhead
- async=True, will retain existing logic: create new nccl-stream, let it wait on current-stream to ensure tensors are ready
- pass down async from c10d down to NCCL-PG
this helps shave off 50% CPU overhead **(70us -> 35us)**, which reduce total CPU/GPU from **230us to 195us by 15%**
Differential Revision: D70135605

* [PGNCCL] Make avoid-record-stream default

* [c10d] Add asyncOp argument to Ops

* Change python side wait

* Pass asyncOp at ProcessGroup level

* Watchdog unstashing tensors as a safety net

* Stash tensors for reduce_scatter_v and all_gather_v
Pull Request approved: #149753

* [c10d] Move unstashing from watchdog to main thread
Pull Request approved: #150079

* [PGNCCL][BE] Merge mutex into TensorShelf for encapsulation
Pull Request approved: #150130

ghstack-source-id: ce103fc
Pull Request resolved: #150398
pytorchmergebot pushed a commit that referenced this issue Apr 1, 2025
…irely (#150398)

Relanding #148590 due to merge conflict.

This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back.
- Resolves #147729
- Resolves #146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves #147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with @cenzhaometa who wants to remove the event sync overhead.

Squashed contents:

* [ptd][nccl] use current-stream as nccl-stream under async=False mode (#147820)
PTD current workflow:
- PTD creates its own dedicated `ncclStream` for comm operation
- it will first add a dependency on current-stream (typically the compute stream) to ensure tensors are ready before invoking collective
such stream synchronization become expensive in Inference world (cpu overhead: 70us vs GPU kernel time: 160us).
This diff:
- async=False [default], will use current-stream as nccl-stream and avoid the stream-sync overhead
- async=True, will retain existing logic: create new nccl-stream, let it wait on current-stream to ensure tensors are ready
- pass down async from c10d down to NCCL-PG
this helps shave off 50% CPU overhead **(70us -> 35us)**, which reduce total CPU/GPU from **230us to 195us by 15%**

* [PGNCCL] Make avoid-record-stream default

* [c10d] Add asyncOp argument to Ops

* Change python side wait

* Pass asyncOp at ProcessGroup level

* Watchdog unstashing tensors as a safety net

* Stash tensors for reduce_scatter_v and all_gather_v
Pull Request approved: #149753

* [c10d] Move unstashing from watchdog to main thread
Pull Request approved: #150079

* [PGNCCL][BE] Merge mutex into TensorShelf for encapsulation
Pull Request approved: #150130

Pull Request resolved: #150398
Approved by: https://github.com/atalman
amathewc pushed a commit to amathewc/pytorch that referenced this issue Apr 17, 2025
…irely (pytorch#150398)

Relanding pytorch#148590 due to merge conflict.

This PR has multiple changes to `ProcessGroupNCCL` (which unfortunately are related):
1. When async_op=False, we directly launch the collective on "current" stream, instead of a trampoline stream and join back.
- Resolves pytorch#147729
- Resolves pytorch#146881
- Also saves two event syncs (which have overhead in case of HIP) and one pybind when we call `work.wait()` in distributed_c10d.py on behalf of user.
2. Entirely remove `record_stream` and use CPU-side stashing for managing tensor lifetime against recycling.
- Resolves pytorch#147168
3. Remove tensor life management when async_op=False; only use it when async_op=True.
4. To guard against user not calling `work.wait()`, we ask watchdog to unstash tensors after detecting completion of collectives, to prevent us from holding reference to tensors forever. This is a safety net, rather than a service guarantee, see discussion [here](pytorch#147168 (comment)).
5. Profile in async_op=False mode would look different -- collective kernels would show up in the same line and compute kernels.

Joint work with @cenzhaometa who wants to remove the event sync overhead.

Squashed contents:

* [ptd][nccl] use current-stream as nccl-stream under async=False mode (pytorch#147820)
PTD current workflow:
- PTD creates its own dedicated `ncclStream` for comm operation
- it will first add a dependency on current-stream (typically the compute stream) to ensure tensors are ready before invoking collective
such stream synchronization become expensive in Inference world (cpu overhead: 70us vs GPU kernel time: 160us).
This diff:
- async=False [default], will use current-stream as nccl-stream and avoid the stream-sync overhead
- async=True, will retain existing logic: create new nccl-stream, let it wait on current-stream to ensure tensors are ready
- pass down async from c10d down to NCCL-PG
this helps shave off 50% CPU overhead **(70us -> 35us)**, which reduce total CPU/GPU from **230us to 195us by 15%**

* [PGNCCL] Make avoid-record-stream default

* [c10d] Add asyncOp argument to Ops

* Change python side wait

* Pass asyncOp at ProcessGroup level

* Watchdog unstashing tensors as a safety net

* Stash tensors for reduce_scatter_v and all_gather_v
Pull Request approved: pytorch#149753

* [c10d] Move unstashing from watchdog to main thread
Pull Request approved: pytorch#150079

* [PGNCCL][BE] Merge mutex into TensorShelf for encapsulation
Pull Request approved: pytorch#150130

Pull Request resolved: pytorch#150398
Approved by: https://github.com/atalman
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
oncall: distributed Add this issue/PR to distributed oncall triage queue
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants
0