8000 Update on "[WIP] sync and async torch.distributed.rpc for builtin ope… · pytorch/pytorch@c5cb2a1 · GitHub
[go: up one dir, main page]

Skip to content

Commit c5cb2a1

Browse files
committed
Update on "[WIP] sync and async torch.distributed.rpc for builtin operators"
Stack from [ghstack](https://github.com/ezyang/ghstack): * **#23228 sync and async torch.distributed.rpc for builtin operators** Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation for #23110 * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
1 parent c3c6013 commit c5cb2a1

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

torch/csrc/distributed/rpc/ProcessGroupAgent.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ void ProcessGroupAgent::shutdown() {
9090
// 2. A GLOO process cannot send message to itself. (there is an ongoing
9191
// effort to fix this problem).
9292
int dst = (pg_->getRank() + 1) % pg_->getSize();
93-
enqueue(SendWork(dst, std::move(Message({}, {}, MessageType::SHUTDOWN))));
93+
enqueue(SendWork(dst, Message({}, {}, MessageType::SHUTDOWN)));
9494
std::unique_lock<std::mutex> lock(sendQueueMutex_);
9595
workConsumeCV_.wait(lock, [&] { return sendQueue_.empty(); });
9696
stop_ = true;

torch/csrc/distributed/rpc/init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ namespace {
2121
template <typename T>
2222
using shared_ptr_class_ = py::class_<T, std::shared_ptr<T>>;
2323

24-
PyObject* rpc_init(PyObject* _unused) {
24+
PyObject* rpc_init(PyObject* /* unused */) {
2525
auto dist_module = THPObjectPtr(PyImport_ImportModule("torch.distributed"));
2626
if (!dist_module) {
2727
throw python_error();

0 commit comments

Comments
 (0)
0