8000 Merge pull request #38301 from cyphar/waitgroup-limits · moby/moby@2cb26cf · GitHub
[go: up one dir, main page]

Skip to content

Commit 2cb26cf

Browse files
authored
Merge pull request #38301 from cyphar/waitgroup-limits
daemon: switch to semaphore-gated WaitGroup for startup tasks
2 parents a07fbfb + 5a52917 commit 2cb26cf

File tree

3 files changed

+170
-64
lines changed

3 files changed

+170
-64
lines changed

daemon/daemon.go

Lines changed: 130 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import (
6666
"github.com/docker/libnetwork/cluster"
6767
nwconfig "github.com/docker/libnetwork/config"
6868
"github.com/pkg/errors"
69+
"golang.org/x/sync/semaphore"
6970
)
7071

7172
// ContainersNamespace is the name of the namespace used for users containers
@@ -197,6 +198,7 @@ func (daemon *Daemon) NewResolveOptionsFunc() resolver.ResolveOptionsFunc {
197198
}
198199

199200
func (daemon *Daemon) restore() error {
201+
var mapLock sync.Mutex
200202
containers := make(map[string]*container.Container)
201203

202204
logrus.Info("Loading containers: start.")
@@ -206,68 +208,99 @@ func (daemon *Daemon) restore() error {
206208
return err
207209
}
208210

211+
// parallelLimit is the maximum number of parallel startup jobs that we
212+
// allow (this is the limited used for all startup semaphores). The multipler
213+
// (128) was chosen after some fairly significant benchmarking -- don't change
214+
// it unless you've tested it significantly (this value is adjusted if
215+
// RLIMIT_NOFILE is small to avoid EMFILE).
216+
parallelLimit := adjustParallelLimit(len(dir), 128*runtime.NumCPU())
217+
218+
// Re-used for all parallel startup jobs.
219+
var group sync.WaitGroup
220+
sem := semaphore.NewWeighted(int64(parallelLimit))
221+
209222
for _, v := range dir {
210-
id := v.Name()
211-
container, err := daemon.load(id)
212-
if err != nil {
213-
logrus.Errorf("Failed to load container %v: %v", id, err)
214-
continue
215-
}
216-
if !system.IsOSSupported(container.OS) {
217-
logrus.Errorf("Failed to load container %v: %s (%q)", id, system.ErrNotSupportedOperatingSystem, container.OS)
218-
continue
219-
}
220-
// Ignore the container if it does not support the current driver being used by the graph
221-
currentDriverForContainerOS := daemon.graphDrivers[container.OS]
222-
if (container.Driver == "" && currentDriverForContainerOS == "aufs") || container.Driver == currentDriverForContainerOS {
223-
rwlayer, err := daemon.imageService.GetLayerByID(container.ID, container.OS)
223+
group.Add(1)
224+
go func(id string) {
225+
defer group.Done()
226+
_ = sem.Acquire(context.Background(), 1)
227+
defer sem.Release(1)
228+
229+
container, err := daemon.load(id)
224230
if err != nil {
225-
logrus.Errorf("Failed to load container mount %v: %v", id, err)
226-
continue
231+
logrus.Errorf("Failed to load container %v: %v", id, err)
232+
return
227233
}
228-
container.RWLayer = rwlayer
229-
logrus.Debugf("Loaded container %v, isRunning: %v", container.ID, container.IsRunning())
234+
if !system.IsOSSupported(container.OS) {
235+
logrus.Errorf("Failed to load container %v: %s (%q)", id, system.ErrNotSupportedOperatingSystem, container.OS)
236+
return
237+
}
238+
// Ignore the container if it does not support the current driver being used by the graph
239+
currentDriverForContainerOS := daemon.graphDrivers[container.OS]
240+
if (container.Driver == "" && currentDriverForContainerOS == "aufs") || container.Driver == currentDriverForContainerOS {
241+
rwlayer, err := daemon.imageService.GetLayerByID(container.ID, container.OS)
242+
if err != nil {
243+
logrus.Errorf("Failed to load container mount %v: %v", id, err)
244+
return
245+
}
246+
container.RWLayer = rwlayer
247+
logrus.Debugf("Loaded container %v, isRunning: %v", container.ID, container.IsRunning())
230248

231-
containers[container.ID] = container
232-
} else {
233-
logrus.Debugf("Cannot load container %s because it was created with another graph driver.", container.ID)
234-
}
249+
mapLock.Lock()
250+
containers[container.ID] = container
251+
mapLock.Unlock()
252+
} else {
253+
logrus.Debugf("Cannot load container %s because it was created with another graph driver.", container.ID)
254+
}
255+
}(v.Name())
235256
}
257+
group.Wait()
236258

237259
removeContainers := make(map[string]*container.Container)
238260
restartContainers := make(map[*container.Container]chan struct{})
239261
activeSandboxes := make(map[string]interface{})
262+
240263
for id, c := range containers {
241-
if err := daemon.registerName(c); err != nil {
242-
logrus.Errorf("Failed to register container name %s: %s", c.ID, err)
243-
delete(containers, id)
244-
continue
245-
}
246-
if err := daemon.Register(c); err != nil {
247-
logrus.Errorf("Failed to register container %s: %s", c.ID, err)
248-
delete(containers, id)
249-
continue
250-
}
264+
group.Add(1)
265+
go func(c *container.Container) {
266+
defer group.Done()
267+
_ = sem.Acquire(context.Background(), 1)
268+
defer sem.Release(1)
251269

252-
// The LogConfig.Type is empty if the container was created before docker 1.12 with default log driver.
253-
// We should rewrite it to use the daemon defaults.
254-
// Fixes https://github.com/docker/docker/issues/22536
255-
if c.HostConfig.LogConfig.Type == "" {
256-
if err := daemon.mergeAndVerifyLogConfig(&c.HostConfig.LogConfig); err != nil {
257-
logrus.Errorf("Failed to verify log config for container %s: %q", c.ID, err)
258-
continue
270+
if err := daemon.registerName(c); err != nil {
271+
logrus.Errorf("Failed to register container name %s: %s", c.ID, err)
272+
mapLock.Lock()
273+
delete(containers, id)
274+
mapLock.Unlock()
275+
return
259276
}
260-
}
277+
if err := daemon.Register(c); err != nil {
278+
logrus.Errorf("Failed to register container %s: %s", c.ID, err)
279+
mapLock.Lock()
280+
delete(containers, id)
281+
mapLock.Unlock()
282+
return
283+
}
284+
285+
// The LogConfig.Type is empty if the container was created before docker 1.12 with default log driver.
286+
// We should rewrite it to use the daemon defaults.
287+
// Fixes https://github.com/docker/docker/issues/22536
288+
if c.HostConfig.LogConfig.Type == "" {
289+
if err := daemon.mergeAndVerifyLogConfig(&c.HostConfig.LogConfig); err != nil {
290+
logrus.Errorf("Failed to verify log config for container %s: %q", c.ID, err)
291+
}
292+
}
293+
}(c)
261294
}
295+
group.Wait()
262296

263-
var (
264-
wg sync.WaitGroup
265-
mapLock sync.Mutex
266-
)
267297
for _, c := range containers {
268-
wg.Add(1)
298+
group.Add(1)
269299
go func(c *container.Container) {
270-
defer wg.Done()
300+
defer group.Done()
301+
_ = sem.Acquire(context.Background(), 1)
302+
defer sem.Release(1)
303+
271304
daemon.backportMountSpec(c)
272305
if err := daemon.checkpointAndSave(c); err != nil {
273306
logrus.WithError(err).WithField("container", c.ID).Error("error saving backported mountspec to disk")
@@ -414,26 +447,33 @@ func (daemon *Daemon) restore() error {
414447
c.Unlock()
415448
}(c)
416449
}
417-
wg.Wait()
450+
group.Wait()
451+
418452
daemon.netController, err = daemon.initNetworkController(daemon.configStore, activeSandboxes)
419453
if err != nil {
420454
return fmt.Errorf("Error initializing network controller: %v", err)
421455
}
422456

423457
// Now that all the containers are registered, register the links
424458
for _, c := range containers {
425-
if err := daemon.registerLinks(c, c.HostConfig); err != nil {
426-
logrus.Errorf("failed to register link for container %s: %v", c.ID, err)
427-
}
459+
group.Add(1)
460+
go func(c *container.Container) {
461+
_ = sem.Acquire(context.Background(), 1)
462+
463+
if err := daemon.registerLinks(c, c.HostConfig); err != nil {
464+
logrus.Errorf("failed to register link for container %s: %v", c.ID, err)
465+
}
466+
467+
sem.Release(1)
468+
group.Done()
469+
}(c)
428470
}
471+
group.Wait()
429472

430-
group := sync.WaitGroup{}
431473
for c, notifier := range restartContainers {
432474
group.Add(1)
433-
434475
go func(c *container.Container, chNotify chan struct{}) {
435-
defer group.Done()
436-
476+
_ = sem.Acquire(context.Background(), 1)
437477
logrus.Debugf("Starting container %s", c.ID)
438478

439479
// ignore errors here as this is a best effort to wait for children to be
@@ -455,22 +495,27 @@ func (daemon *Daemon) restore() error {
455495
logrus.Errorf("Failed to start container %s: %s", c.ID, err)
456496
}
457497
close(chNotify)
458-
}(c, notifier)
459498

499+
sem.Release(1)
500+
group.Done()
501+
}(c, notifier)
460502
}
461503
group.Wait()
462504

463-
removeGroup := sync.WaitGroup{}
464505
for id := range removeContainers {
465-
removeGroup.Add(1)
506+
group.Add(1)
466507
go func(cid string) {
508+
_ = sem.Acquire(context.Background(), 1)
509+
467510
if err := daemon.ContainerRm(cid, &types.ContainerRmConfig{ForceRemove: true, RemoveVolume: true}); err != nil {
468511
logrus.Errorf("Failed to remove container %s: %s", cid, err)
469512
}
470-
removeGroup.Done()
513+
514+
sem.Release(1)
515+
group.Done()
471516
}(id)
472517
}
473-
removeGroup.Wait()
518+
group.Wait()
474519

475520
// any containers that were started above would already have had this done,
476521
// however we need to now prepare the mountpoints for the rest of the containers as well.
@@ -491,13 +536,16 @@ func (daemon *Daemon) restore() error {
491536

492537
group.Add(1)
493538
go func(c *container.Container) {
494-
defer group.Done()
539+
_ = sem.Acquire(context.Background(), 1)
540+
495541
if err := daemon.prepareMountPoints(c); err != nil {
496542
logrus.Error(err)
497543
}
544+
545+
sem.Release(1)
546+
group.Done()
498547
}(c)
499548
}
500-
501549
group.Wait()
502550

503551
logrus.Info("Loading containers: done.")
@@ -508,7 +556,18 @@ func (daemon *Daemon) restore() error {
508556
// RestartSwarmContainers restarts any autostart container which has a
509557
// swarm endpoint.
510558
func (daemon *Daemon) RestartSwarmContainers() {
511-
group := sync.WaitGroup{}
559+
ctx := context.Background()
560+
561+
// parallelLimit is the maximum number of parallel startup jobs that we
562+
// allow (this is the limited used for all startup semaphores). The multipler
563+
// (128) was chosen after some fairly significant benchmarking -- don't change
564+
// it unless you've tested it significantly (this value is adjusted if
565+
// RLIMIT_NOFILE is small to avoid EMFILE).
566+
parallelLimit := adjustParallelLimit(len(daemon.List()), 128*runtime.NumCPU())
567+
568+
var group sync.WaitGroup
569+
sem := semaphore.NewWeighted(int64(parallelLimit))
570+
512571
for _, c := range daemon.List() {
513572
if !c.IsRunning() && !c.IsPaused() {
514573
// Autostart all the containers which has a
@@ -517,14 +576,21 @@ func (daemon *Daemon) RestartSwarmContainers() {
517576
if daemon.configStore.AutoRestart && c.ShouldRestart() && c.NetworkSettings.HasSwarmEndpoint && c.HasBeenStartedBefore {
518577
group.Add(1)
519578
go func(c *container.Container) {
520-
defer group.Done()
579+
if err := sem.Acquire(ctx, 1); err != nil {
580+
// ctx is done.
581+
group.Done()
582+
return
583+
}
584+
521585
if err := daemon.containerStart(c, "", "", true); err != nil {
522586
logrus.Error(err)
523587
}
588+
589+
sem.Release(1)
590+
group.Done()
524591
}(c)
525592
}
526593
}
527-
528594
}
529595
group.Wait()
530596
}

daemon/daemon_unix.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,41 @@ func getBlkioThrottleDevices(devs []*blkiodev.ThrottleDevice) ([]specs.LinuxThro
257257
return throttleDevices, nil
258258
}
259259

260+
// adjustParallelLimit takes a number of objects and a proposed limit and
261+
// figures out if it's reasonable (and adjusts it accordingly). This is only
262+
// used for daemon startup, which does a lot of parallel loading of containers
263+
// (and if we exceed RLIMIT_NOFILE then we're in trouble).
264+
func adjustParallelLimit(n int, limit int) int {
265+
// Rule-of-thumb overhead factor (how many files will each goroutine open
266+
// simultaneously). Yes, this is ugly but to be frank this whole thing is
267+
// ugly.
268+
const overhead = 2
269+
270+
// On Linux, we need to ensure that parallelStartupJobs doesn't cause us to
271+
// exceed RLIMIT_NOFILE. If parallelStartupJobs is too large, we reduce it
272+
// and give a warning (since in theory the user should increase their
273+
// ulimits to the largest possible value for dockerd).
274+
var rlim unix.Rlimit
275+
if err := unix.Getrlimit(unix.RLIMIT_NOFILE, &rlim); err != nil {
276+
logrus.Warnf("Couldn't find dockerd's RLIMIT_NOFILE to double-check startup parallelism factor: %v", err)
277+
return limit
278+
}
279+
softRlimit := int(rlim.Cur)
280+
281+
// Much fewer containers than RLIMIT_NOFILE. No need to adjust anything.
282+
if softRlimit > overhead*n {
283+
return limit
284+
}
285+
286+
// RLIMIT_NOFILE big enough, no need to adjust anything.
287+
if softRlimit > overhead*limit {
288+
return limit
289+
}
290+
291+
logrus.Warnf("Found dockerd's open file ulimit (%v) is far too small -- consider increasing it significantly (at least %v)", softRlimit, overhead*limit)
292+
return softRlimit / overhead
293+
}
294+
260295
func checkKernel() error {
261296
// Check for unsupported kernel versions
262297
// FIXME: it would be cleaner to not test for specific versions, but rather

daemon/daemon_windows.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ const (
4040
windowsMaxCPUPercent = 100
4141
)
4242

43+
// Windows doesn't really have rlimits.
44+
func adjustParallelLimit(n int, limit int) int {
45+
return limit
46+
}
47+
4348
// Windows has no concept of an execution state directory. So use config.Root here.
4449
func getPluginExecRoot(root string) string {
4550
return filepath.Join(root, "plugins")

0 commit comments

Comments
 (0)
0