@@ -85,6 +85,10 @@ std::vector<at::Tensor> extractTensors(const c10::IValue& result) {
85
85
return result.toTensorVector ();
86
86
}
87
87
88
+ bool should_ddp_set_last_bucket_as_small () {
89
+ return getCvarString ({" DDP_SET_LAST_BUCKET_CAP" }, " N/A" ) == " 1" ;
90
+ }
91
+
88
92
} // namespace
89
93
90
94
Reducer::Reducer (
@@ -126,7 +130,7 @@ Reducer::Reducer(
126
130
use_python_reducer_(use_python_reducer) {
127
131
C10_LOG_API_USAGE_ONCE (" torch.distributed.ddp.reducer" );
128
132
TORCH_INTERNAL_ASSERT (!params_.empty (), " Expected at least one parameter." );
129
-
133
+ std::cout << " hello from c++ reducer " << std::endl;
130
134
if (ddp_debug_level_ != c10d::DebugLevel::Off) {
131
135
LOG (INFO) << " Reducer initialized with bucket_bytes_cap: "
132
136
<< bucket_bytes_cap_
@@ -174,6 +178,7 @@ Reducer::Reducer(
174
178
// can be marked as ready for reduction.
175
179
{
176
180
const auto variable_count = params_.size ();
181
+ std::cout << " reducer found " << variable_count << " variables" << std::endl;
177
182
grad_accumulators_.resize (variable_count);
178
183
for (const auto variable_index : c10::irange (variable_count)) {
179
184
auto & variable = params_[variable_index];
@@ -198,13 +203,124 @@ Reducer::Reducer(
198
203
this ->rpc_context_ .set (
199
204
ThreadLocalDistAutogradContext::getContextPtr ());
200
205
#endif
206
+ std::cout << " marking variable " << variable_index << " as ready" << std::endl;
201
207
this ->autograd_hook (variable_index);
202
208
return outputs;
203
209
},
204
- [this ](torch::autograd::CompiledNodeArgs& args) {
205
- TORCH_CHECK (
206
- this ->use_python_reducer_ ,
207
- " Compiled autograd is not compatible with C++ DDP Reducer, please use torch._dynamo.config.optimize_ddp=\" python_reducer\" ." );
210
+ [this , variable_index](torch::autograd::CompiledNodeArgs& args) {
211
+ std::cout << " collecting the post hook on variable_index=" << variable_index << std::endl;
212
+ if (use_python_reducer_) {
213
+ return ;
214
+ }
215
+
216
+ // filters out unsupported DDP arguments
217
+ auto str =
218
+ " Compiled autograd is not compatible with C++ DDP Reducer, please use torch._dynamo.config.optimize_ddp=\" python_reducer\" ." ;
219
+ // std::cout << "mixed precision" << std::endl;
220
+ TORCH_CHECK (!mixed_precision_param_dtype_.has_value (), str);
221
+ // std::cout << "find unused" << std::endl;
222
+ TORCH_CHECK (!find_unused_parameters_, str);
223
+ // std::cout << "ddp debug level" << std::endl;
224
+ TORCH_CHECK (ddp_debug_level_ == c10d::DebugLevel::Off, str);
225
+ // std::cout << "rpc" << std::endl;
226
+ TORCH_CHECK (rpc_context_.context_ptr .load () == nullptr , str);
227
+
228
+ // TODO: if not expect autograd hooks, means no sync
229
+ // std::cout << "expect hooks" << std::endl;
230
+ TORCH_CHECK (expect_autograd_hooks_, str);
231
+
232
+ // std::cout << "expect spars" << std::endl;
233
+ for (bool b : ex
10000
pect_sparse_gradients_) {
234
+ TORCH_CHECK (!b, str);
235
+ }
236
+ // std::cout << "bucket view" << std::endl;
237
+ TORCH_CHECK (!gradient_as_bucket_view_, str);
238
+ // std::cout << "comm hook non nullptr" << std::endl;
239
+ TORCH_CHECK (comm_hook_ == nullptr , str);
240
+ // std::cout << "not use static world size" << std::endl;
241
+ TORCH_CHECK (forwardPassWorkHandle_.useStaticWorldSize , str);
242
+ TORCH_CHECK (!should_ddp_set_last_bucket_as_small (), str);
243
+ // ignore param_names_
244
+ // todo: skip create_graph with ddp message
245
+ if (static_graph_) {
246
+ TORCH_WARN_ONCE (
247
+ " static_graph ignored, compiled autograd always rebuilds buckets when param ready order changes." );
248
+ }
249
+ int div_factor = process_group_->getSize ();
250
+ args.collect (div_factor);
251
+ args.collect_ddp_param_index (variable_index);
252
+ // collect size limit etc.
253
+ // Rewrite C++ Reducer
254
+
255
+ // temp validation
256
+ if (args.retrieve_ddp_param_index_order ().size () == params_.size ()) {
257
+ std::cout << std::endl;
258
+ std::cout << " first_bucket_bytes_cap_=" << first_bucket_bytes_cap_ << " , bucket_bytes_cap_=" << bucket_bytes_cap_ << std::endl;
259
+ std::cout << " ALL PARAMS GOT HOOKS" << std::endl;
260
+ auto [buckets, bucket_size_limits] = compute_bucket_assignment_by_size (
261
+ params_,
262
+ {static_cast <size_t >(first_bucket_bytes_cap_), static_cast <size_t >(bucket_bytes_cap_)},
263
+ /* expect_sparse_gradient */ {},
264
+ /* tensor_indices*/ args.retrieve_ddp_param_index_order (),
265
+ /* logger */ {}
266
+ );
267
+
268
+ std::cout << " param order: " ;
269
+ for (auto index : args.retrieve_ddp_param_index_order ()) {
270
+ auto tensor = params_[index];
271
+ size_t mb = tensor.numel () * tensor.element_size ();
272
+ std::cout << index << " (" << mb <<" MiB), " ;
273
+ }
274
+ std::cout << std::endl;
275
+
276
+ std::string bucket_size_limits_str = " " ;
277
+ for (auto limit : bucket_size_limits) {
278
+ bucket_size_limits_str += (std::to_string (limit) + " , " );
279
+ }
280
+ std::cout << " limits per bucket: " << bucket_size_limits_str << std::endl;
281
+ for (size_t i = 0 ; i < buckets.size (); i++) {
282
+ std::cout << " bucket " << i << " : " << std::endl;
283
+ for (auto & index : buckets[i]) {
284
+ std::cout << index << " , " ;
285
+ }
286
+ std::cout << std::endl;
287
+ }
288
+ std::cout << std::endl;
289
+ }
290
+
291
+ },
292
+ [this , variable_index](
293
+ torch::autograd::Variable& variable,
294
+ torch::autograd::SwapSavedVariables& saved) {
295
+ bool is_first_hook = true ;
296
+ if (is_first_hook) {
297
+ auto [buckets, _] = compute_bucket_assignment_by_size (
298
+ params_,
299
+ {static_cast <size_t >(first_bucket_bytes_cap_), static_cast <size_t >(bucket_bytes_cap_)},
300
+ /* expect_sparse_gradient */ {},
301
+ /* tensor_indices*/ saved.retrieve_ddp_param_index_order (),
302
+ /* logger */ {}
303
+ );
304
+ }
305
+ // TODO: NOTHING IS CALLING THIS rn
306
+ at::Tensor& param = get_param_from_index (variable_index);
307
+ saved.before (param);
308
+ int div_factor = process_group_->getSize ();
309
+ // need to swap the param to its proxy
310
+ // then we can call the bucket with the proxies.
311
+ // and when bucket size cap reached, launch
312
+ bool should_issue = true ;
313
+ if (should_issue) {
314
+ // should issue bucket
315
+ const auto & pyinterface =
316
+ torch::dynamo::autograd::getPyCompilerInterface ();
317
+ pyinterface->call_unpack (
318
+ saved.get_py_compiler (), 0 , div_factor);
319
+ } else {
320
+ // // should bucket
321
+ // saved.state.ddp_bucket.emplace_back(param);
322
+ }
323
+ saved.after (param);
208
324
})),
209
325
grad_accumulator);
210
326
@@ -537,7 +653,7 @@ void Reducer::push_rebuilt_params_for_all_indices() {
537
653
538
654
void Reducer::push_rebuilt_params (const size_t & index) {
539
655
rebuilt_params_.push_back (params_[index]);
540
- rebuilt_param_indices_.push_back (static_cast < int64_t >( index) );
656
+ rebuilt_param_indices_.push_back (index);
541
657
}
542
658
543
659
void Reducer::set_divide_factor () {
@@ -1678,6 +1794,9 @@ void Reducer::finalize_backward() {
1678
1794
" currently only support to skip all reduce for unused params "
1679
1795
" when skip_all_reduce_unused_params_ is true." );
1680
1796
continue ;
1797
+ } else {
1798
+ std::cout << " skipping bucket work" << std::endl;
1799
+ continue ;
1681
1800
}
1682
1801
1683
1802
bucket.future_work ->wait ();
@@ -1892,8 +2011,7 @@ bool Reducer::rebuild_buckets() {
1892
2011
std::vector<size_t > bucket_size_limits;
1893
2012
bucket_size_limits.push_back (first_bucket_bytes_cap_);
1894
2013
bucket_size_limits.push_back (bucket_bytes_cap_);
1895
- auto ddp_set_last_bucket_as_small =
1896
- (getCvarString ({" DDP_SET_LAST_BUCKET_CAP" }, " N/A" ) == " 1" );
2014
+ bool ddp_set_last_bucket_as_small = should_ddp_set_last_bucket_as_small ();
1897
2015
1898
2016
if (ddp_set_last_bucket_as_small) {
1899
2017
// Reverse so that first_bucket_bytes_cap_ (smaller bucket) becomes the last
@@ -2166,7 +2284,7 @@ compute_bucket_assignment_by_size(
2166
2284
const std::vector<at::Tensor>& tensors,
2167
2285
const std::vector<size_t >& bucket_size_limits,
2168
2286
const std::vector<bool >& expect_sparse_gradient,
2169
- const std::vector<int64_t >& tensor_indices,
2287
+ const std::vector<size_t >& tensor_indices,
2170
2288
const std::optional<std::weak_ptr<c10d::Logger>>& logger) {
2171
2289
// Either expect_sparse_gradient is not specified or it has as many elements
2172
2290
// as the vector with tensors.
@@ -2284,6 +2402,20 @@ compute_bucket_assignment_by_size(
2284
2402
bucket_indices.emplace_back (std::get<0 >(bucket_indices_with_size));
2285
2403
per_bucket_size_limits.emplace_back (std::get<1 >(bucket_indices_with_size));
2286
2404
}
2405
+
2406
+ std::cout << std::endl;
2407
+ std::cout << std::endl;
2408
+ std::cout << " Finished computing bucket assignment" << std::endl;
2409
+ for (size_t i=0 ; i<bucket_indices.size (); i++) {
2410
+ std::cout << " bucket[" <<i<<" ]: " ;
2411
+ for (const auto & variable_index : bucket_indices[i]) {
2412
+ std::cout << variable_index << " , " ;
2413
+ }
2414
+ std::cout << std::endl;
2415
+ }
2416
+ std::cout << std::endl;
2417
+ std::cout << std::endl;
2418
+
2287
2419
return std::make_tuple (bucket_indices, per_bucket_size_limits);
2288
2420
}
2289
2421
@@ -2401,6 +2533,7 @@ void verify_params_across_processes(
2401
2533
}
2402
2534
2403
2535
void Reducer::remove_autograd_hooks () {
2536
+ std::cout << " ===========================REMOVING AUTOGRAD HOOKS======================" << std::endl;
2404
2537
// Remove all hooks on variables registered by this Reducer. This is necessary
2405
2538
// to make DDP failure recoverable. Otherwise, multiple Reducer instances
2406
2539
// (from recoveries) will add their hooks to the original model, and those
0 commit comments