@@ -66,6 +66,7 @@ import (
66
66
"github.com/docker/libnetwork/cluster"
67
67
nwconfig "github.com/docker/libnetwork/config"
68
68
"github.com/pkg/errors"
69
+ "golang.org/x/sync/semaphore"
69
70
)
70
71
71
72
// ContainersNamespace is the name of the namespace used for users containers
@@ -197,6 +198,7 @@ func (daemon *Daemon) NewResolveOptionsFunc() resolver.ResolveOptionsFunc {
197
198
}
198
199
199
200
func (daemon * Daemon ) restore () error {
201
+ var mapLock sync.Mutex
200
202
containers := make (map [string ]* container.Container )
201
203
202
204
logrus .Info ("Loading containers: start." )
@@ -206,68 +208,99 @@ func (daemon *Daemon) restore() error {
206
208
return err
207
209
}
208
210
211
+ // parallelLimit is the maximum number of parallel startup jobs that we
212
+ // allow (this is the limited used for all startup semaphores). The multipler
213
+ // (128) was chosen after some fairly significant benchmarking -- don't change
214
+ // it unless you've tested it significantly (this value is adjusted if
215
+ // RLIMIT_NOFILE is small to avoid EMFILE).
216
+ parallelLimit := adjustParallelLimit (len (dir ), 128 * runtime .NumCPU ())
217
+
218
+ // Re-used for all parallel startup jobs.
219
+ var group sync.WaitGroup
220
+ sem := semaphore .NewWeighted (int64 (parallelLimit ))
221
+
209
222
for _ , v := range dir {
210
- id := v .Name ()
211
- container , err := daemon .load (id )
212
- if err != nil {
213
- logrus .Errorf ("Failed to load container %v: %v" , id , err )
214
- continue
215
- }
216
- if ! system .IsOSSupported (container .OS ) {
217
- logrus .Errorf ("Failed to load container %v: %s (%q)" , id , system .ErrNotSupportedOperatingSystem , container .OS )
218
- continue
219
- }
220
- // Ignore the container if it does not support the current driver being used by the graph
221
- currentDriverForContainerOS := daemon .graphDrivers [container .OS ]
222
- if (container .Driver == "" && currentDriverForContainerOS == "aufs" ) || container .Driver == currentDriverForContainerOS {
223
- rwlayer , err := daemon .imageService .GetLayerByID (container .ID , container .OS )
223
+ group .Add (1 )
224
+ go func (id string ) {
225
+ defer group .Done ()
226
+ _ = sem .Acquire (context .Background (), 1 )
227
+ defer sem .Release (1 )
228
+
229
+ container , err := daemon .load (id )
224
230
if err != nil {
225
- logrus .Errorf ("Failed to load container mount %v: %v" , id , err )
226
- continue
231
+ logrus .Errorf ("Failed to load container %v: %v" , id , err )
232
+ return
227
233
}
228
- container .RWLayer = rwlayer
229
- logrus .Debugf ("Loaded container %v, isRunning: %v" , container .ID , container .IsRunning ())
234
+ if ! system .IsOSSupported (container .OS ) {
235
+ logrus .Errorf ("Failed to load container %v: %s (%q)" , id , system .ErrNotSupportedOperatingSystem , container .OS )
236
+ return
237
+ }
238
+ // Ignore the container if it does not support the current driver being used by the graph
239
+ currentDriverForContainerOS := daemon .graphDrivers [container .OS ]
240
+ if (container .Driver == "" && currentDriverForContainerOS == "aufs" ) || container .Driver == currentDriverForContainerOS {
241
+ rwlayer , err := daemon .imageService .GetLayerByID (container .ID , container .OS )
242
+ if err != nil {
243
+ logrus .Errorf ("Failed to load container mount %v: %v" , id , err )
244
+ return
245
+ }
246
+ container .RWLayer = rwlayer
247
+ logrus .Debugf ("Loaded container %v, isRunning: %v" , container .ID , container .IsRunning ())
230
248
231
- containers [container .ID ] = container
232
- } else {
233
- logrus .Debugf ("Cannot load container %s because it was created with another graph driver." , container .ID )
234
- }
249
+ mapLock .Lock ()
250
+ containers [container .ID ] = container
251
+ mapLock .Unlock ()
252
+ } else {
253
+ logrus .Debugf ("Cannot load container %s because it was created with another graph driver." , container .ID )
254
+ }
255
+ }(v .Name ())
235
256
}
257
+ group .Wait ()
236
258
237
259
removeContainers := make (map [string ]* container.Container )
238
260
restartContainers := make (map [* container.Container ]chan struct {})
239
261
activeSandboxes := make (map [string ]interface {})
262
+
240
263
for id , c := range containers {
241
- if err := daemon .registerName (c ); err != nil {
242
- logrus .Errorf ("Failed to register container name %s: %s" , c .ID , err )
243
- delete (containers , id )
244
- continue
245
- }
246
- if err := daemon .Register (c ); err != nil {
247
- logrus .Errorf ("Failed to register container %s: %s" , c .ID , err )
248
- delete (containers , id )
249
- continue
250
- }
264
+ group .Add (1 )
265
+ go func (c * container.Container ) {
266
+ defer group .Done ()
267
+ _ = sem .Acquire (context .Background (), 1 )
268
+ defer sem .Release (1 )
251
269
252
- // The LogConfig.Type is empty if the container was created before docker 1.12 with default log driver.
253
- // We should rewrite it to use the daemon defaults.
254
- // Fixes https://github.com/docker/docker/issues/22536
255
- if c .HostConfig .LogConfig .Type == "" {
256
- if err := daemon .mergeAndVerifyLogConfig (& c .HostConfig .LogConfig ); err != nil {
257
- logrus .Errorf ("Failed to verify log config for container %s: %q" , c .ID , err )
258
- continue
270
+ if err := daemon .registerName (c ); err != nil {
271
+ logrus .Errorf ("Failed to register container name %s: %s" , c .ID , err )
272
+ mapLock .Lock ()
273
+ delete (containers , id )
274
+ mapLock .Unlock ()
275
+ return
259
276
}
260
- }
277
+ if err := daemon .Register (c ); err != nil {
278
+ logrus .Errorf ("Failed to register container %s: %s" , c .ID , err )
279
+ mapLock .Lock ()
280
+ delete (containers , id )
281
+ mapLock .Unlock ()
282
+ return
283
+ }
284
+
285
+ // The LogConfig.Type is empty if the container was created before docker 1.12 with default log driver.
286
+ // We should rewrite it to use the daemon defaults.
287
+ // Fixes https://github.com/docker/docker/issues/22536
288
+ if c .HostConfig .LogConfig .Type == "" {
289
+ if err := daemon .mergeAndVerifyLogConfig (& c .HostConfig .LogConfig ); err != nil {
290
+ logrus .Errorf ("Failed to verify log config for container %s: %q" , c .ID , err )
291
+ }
292
+ }
293
+ }(c )
261
294
}
295
+ group .Wait ()
262
296
263
- var (
264
- wg sync.WaitGroup
265
- mapLock sync.Mutex
266
- )
267
297
for _ , c := range containers {
268
- wg .Add (1 )
298
+ group .Add (1 )
269
299
go func (c * container.Container ) {
270
- defer wg .Done ()
300
+ defer group .Done ()
301
+ _ = sem .Acquire (context .Background (), 1 )
302
+ defer sem .Release (1 )
303
+
271
304
daemon .backportMountSpec (c )
272
305
if err := daemon .checkpointAndSave (c ); err != nil {
273
306
logrus .WithError (err ).WithField ("container" , c .ID ).Error ("error saving backported mountspec to disk" )
@@ -414,26 +447,33 @@ func (daemon *Daemon) restore() error {
414
447
c .Unlock ()
415
448
}(c )
416
449
}
417
- wg .Wait ()
450
+ group .Wait ()
451
+
418
452
daemon .netController , err = daemon .initNetworkController (daemon .configStore , activeSandboxes )
419
453
if err != nil {
420
454
return fmt .Errorf ("Error initializing network controller: %v" , err )
421
455
}
422
456
423
457
// Now that all the containers are registered, register the links
424
458
for _ , c := range containers {
425
- if err := daemon .registerLinks (c , c .HostConfig ); err != nil {
426
- logrus .Errorf ("failed to register link for container %s: %v" , c .ID , err )
427
- }
459
+ group .Add (1 )
460
+ go func (c * container.Container ) {
461
+ _ = sem .Acquire (context .Background (), 1 )
462
+
463
+ if err := daemon .registerLinks (c , c .HostConfig ); err != nil {
464
+ logrus .Errorf ("failed to register link for container %s: %v" , c .ID , err )
465
+ }
466
+
467
+ sem .Release (1 )
468
+ group .Done ()
469
+ }(c )
428
470
}
471
+ group .Wait ()
429
472
430
- group := sync.WaitGroup {}
431
473
for c , notifier := range restartContainers {
432
474
group .Add (1 )
433
-
434
475
go func (c * container.Container , chNotify chan struct {}) {
435
- defer group .Done ()
436
-
476
+ _ = sem .Acquire (context .Background (), 1 )
437
477
logrus .Debugf ("Starting container %s" , c .ID )
438
478
439
479
// ignore errors here as this is a best effort to wait for children to be
@@ -455,22 +495,27 @@ func (daemon *Daemon) restore() error {
455
495
logrus .Errorf ("Failed to start container %s: %s" , c .ID , err )
456
496
}
457
497
close (chNotify )
458
- }(c , notifier )
459
498
499
+ sem .Release (1 )
500
+ group .Done ()
501
+ }(c , notifier )
460
502
}
461
503
group .Wait ()
462
504
463
- removeGroup := sync.WaitGroup {}
464
505
for id := range removeContainers {
465
- removeGroup .Add (1 )
506
+ group .Add (1 )
466
507
go func (cid string ) {
508
+ _ = sem .Acquire (context .Background (), 1 )
509
+
467
510
if err := daemon .ContainerRm (cid , & types.ContainerRmConfig {ForceRemove : true , RemoveVolume : true }); err != nil {
468
511
logrus .Errorf ("Failed to remove container %s: %s" , cid , err )
469
512
}
470
- removeGroup .Done ()
513
+
514
+ sem .Release (1 )
515
+ group .Done ()
471
516
}(id )
472
517
}
473
- removeGroup .Wait ()
518
+ group .Wait ()
474
519
475
520
// any containers that were started above would already have had this done,
476
521
// however we need to now prepare the mountpoints for the rest of the containers as well.
@@ -491,13 +536,16 @@ func (daemon *Daemon) restore() error {
491
536
492
537
group .Add (1 )
493
538
go func (c * container.Container ) {
494
- defer group .Done ()
539
+ _ = sem .Acquire (context .Background (), 1 )
540
+
495
541
if err := daemon .prepareMountPoints (c ); err != nil {
496
542
logrus .Error (err )
497
543
}
544
+
545
+ sem .Release (1 )
546
+ group .Done ()
498
547
}(c )
499
548
}
500
-
501
549
group .Wait ()
502
550
503
551
logrus .Info ("Loading containers: done." )
@@ -508,7 +556,18 @@ func (daemon *Daemon) restore() error {
508
556
// RestartSwarmContainers restarts any autostart container which has a
509
557
// swarm endpoint.
510
558
func (daemon * Daemon ) RestartSwarmContainers () {
511
- group := sync.WaitGroup {}
559
+ ctx := context .Background ()
560
+
561
+ // parallelLimit is the maximum number of parallel startup jobs that we
562
+ // allow (this is the limited used for all startup semaphores). The multipler
563
+ // (128) was chosen after some fairly significant benchmarking -- don't change
564
+ // it unless you've tested it significantly (this value is adjusted if
565
+ // RLIMIT_NOFILE is small to avoid EMFILE).
566
+ parallelLimit := adjustParallelLimit (len (daemon .List ()), 128 * runtime .NumCPU ())
567
+
568
+ var group sync.WaitGroup
569
+ sem := semaphore .NewWeighted (int64 (parallelLimit ))
570
+
512
571
for _ , c := range daemon .List () {
513
572
if ! c .IsRunning () && ! c .IsPaused () {
514
573
// Autostart all the containers which has a
@@ -517,14 +576,21 @@ func (daemon *Daemon) RestartSwarmContainers() {
517
576
if daemon .configStore .AutoRestart && c .ShouldRestart () && c .NetworkSettings .HasSwarmEndpoint && c .HasBeenStartedBefore {
518
577
group .Add (1 )
519
578
go func (c * container.Container ) {
520
- defer group .Done ()
579
+ if err := sem .Acquire (ctx , 1 ); err != nil {
580
+ // ctx is done.
581
+ group .Done ()
582
+ return
583
+ }
584
+
521
585
if err := daemon .containerStart (c , "" , "" , true ); err != nil {
522
586
logrus .Error (err )
523
587
}
588
+
589
+ sem .Release (1 )
590
+ group .Done ()
524
591
}(c )
525
592
}
526
593
}
527
-
528
594
}
529
595
group .Wait ()
530
596
}
0 commit comments