8000 Update on "sync and async torch.distributed.rpc for builtin operators" · pytorch/pytorch@3c642e8 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3c642e8

Browse files
committed
Update on "sync and async torch.distributed.rpc for builtin operators"
Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs. * have a minimum working and testable RPC implementation. * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
1 parent db0385b commit 3c642e8

File tree

2 files changed

+0
-9
lines changed

2 files changed

+0
-9
lines changed

torch/csrc/distributed/rpc/ProcessGroupAgent.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,6 @@ ProcessGroupAgent::ProcessGroupAgent(
7777
listenerThread_ = std::thread(&ProcessGroupAgent::listenLoop, this);
7878
}
7979

80-
ProcessGroupAgent::~ProcessGroupAgent() {
81-
if (!stop_) {
82-
AT_ERROR(stop_, "Must call ProcessGroupAgent::shutdown before destructor");
83-
}
84-
}
85-
86-
8780
void ProcessGroupAgent::join() {
8881
// Every process i sends a SHUTDOWN message to process i + 1. This is
8982
// necessary for now because:

torch/csrc/distributed/rpc/ProcessGroupAgent.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ class ProcessGroupAgent : public RpcAgent {
3030
std::unordered_map<std::string, int> nameMap,
3131
std::shared_ptr<c10d::ProcessGroup> pg);
3232

33-
~ProcessGroupAgent() override;
34-
3533
// This method wraps the destination information and the message into a
3634
// SendWork object, and put the SendWork into a queue. Another thread will
3735
// consume SendWork from the queue and send it out.

0 commit comments

Comments
 (0)
0