From 04c6a08c9b98a0abdfc99f09aa5de516c27959f1 Mon Sep 17 00:00:00 2001 From: k8s-infra-cherrypick-robot <90416843+k8s-infra-cherrypick-robot@users.noreply.github.com> Date: Tue, 9 Sep 2025 17:25:58 -0700 Subject: [PATCH 1/5] =?UTF-8?q?[release-0.22]=20=F0=9F=90=9BPanic=20when?= =?UTF-8?q?=20trying=20to=20build=20more=20than=20one=20instance=20of=20fa?= =?UTF-8?q?ke.ClientBuilder=20(#3315)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * panic when trying to build more than one instance of fake.ClientBuilder Signed-off-by: Troy Connor * pr feedback Signed-off-by: Troy Connor --------- Signed-off-by: Troy Connor Co-authored-by: Troy Connor --- pkg/client/fake/client.go | 5 +++++ pkg/client/fake/client_test.go | 9 +++++++++ 2 files changed, 14 insertions(+) diff --git a/pkg/client/fake/client.go b/pkg/client/fake/client.go index 45f9e00e18..f88a44edd2 100644 --- a/pkg/client/fake/client.go +++ b/pkg/client/fake/client.go @@ -141,6 +141,7 @@ type ClientBuilder struct { interceptorFuncs *interceptor.Funcs typeConverters []managedfields.TypeConverter returnManagedFields bool + isBuilt bool // indexes maps each GroupVersionKind (GVK) to the indexes registered for that GVK. // The inner map maps from index name to IndexerFunc. @@ -267,6 +268,9 @@ func (f *ClientBuilder) WithReturnManagedFields() *ClientBuilder { // Build builds and returns a new fake client. func (f *ClientBuilder) Build() client.WithWatch { + if f.isBuilt { + panic("Build() must not be called multiple times when creating a ClientBuilder") + } if f.scheme == nil { f.scheme = scheme.Scheme } @@ -344,6 +348,7 @@ func (f *ClientBuilder) Build() client.WithWatch { result = interceptor.NewClient(result, *f.interceptorFuncs) } + f.isBuilt = true return result } diff --git a/pkg/client/fake/client_test.go b/pkg/client/fake/client_test.go index 72c20fd56f..21c5f21dd7 100644 --- a/pkg/client/fake/client_test.go +++ b/pkg/client/fake/client_test.go @@ -3159,4 +3159,13 @@ var _ = Describe("Fake client builder", func() { Expect(err).NotTo(HaveOccurred()) Expect(called).To(BeTrue()) }) + + It("should panic when calling build more than once", func() { + cb := NewClientBuilder() + anotherCb := cb + cb.Build() + Expect(func() { + anotherCb.Build() + }).To(Panic()) + }) }) From f3b9e4f96392b66ba2067f53e1d2ab77a0410c82 Mon Sep 17 00:00:00 2001 From: dongjiang Date: Thu, 11 Sep 2025 16:15:35 +0800 Subject: [PATCH 2/5] Bump to k8s.io/* v0.34.1 Signed-off-by: dongjiang --- examples/scratch-env/go.mod | 8 ++++---- examples/scratch-env/go.sum | 16 ++++++++-------- go.mod | 12 ++++++------ go.sum | 24 ++++++++++++------------ tools/setup-envtest/go.mod | 2 +- tools/setup-envtest/go.sum | 4 ++-- 6 files changed, 33 insertions(+), 33 deletions(-) diff --git a/examples/scratch-env/go.mod b/examples/scratch-env/go.mod index a92a25b7d8..546c7c39ee 100644 --- a/examples/scratch-env/go.mod +++ b/examples/scratch-env/go.mod @@ -54,10 +54,10 @@ require ( gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.34.0 // indirect - k8s.io/apiextensions-apiserver v0.34.0 // indirect - k8s.io/apimachinery v0.34.0 // indirect - k8s.io/client-go v0.34.0 // indirect + k8s.io/api v0.34.1 // indirect + k8s.io/apiextensions-apiserver v0.34.1 // indirect + k8s.io/apimachinery v0.34.1 // indirect + k8s.io/client-go v0.34.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect diff --git a/examples/scratch-env/go.sum b/examples/scratch-env/go.sum index 703b352e28..012b88f447 100644 --- a/examples/scratch-env/go.sum +++ b/examples/scratch-env/go.sum @@ -173,14 +173,14 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.34.0 h1:L+JtP2wDbEYPUeNGbeSa/5GwFtIA662EmT2YSLOkAVE= -k8s.io/api v0.34.0/go.mod h1:YzgkIzOOlhl9uwWCZNqpw6RJy9L2FK4dlJeayUoydug= -k8s.io/apiextensions-apiserver v0.34.0 h1:B3hiB32jV7BcyKcMU5fDaDxk882YrJ1KU+ZSkA9Qxoc= -k8s.io/apiextensions-apiserver v0.34.0/go.mod h1:hLI4GxE1BDBy9adJKxUxCEHBGZtGfIg98Q+JmTD7+g0= -k8s.io/apimachinery v0.34.0 h1:eR1WO5fo0HyoQZt1wdISpFDffnWOvFLOOeJ7MgIv4z0= -k8s.io/apimachinery v0.34.0/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= -k8s.io/client-go v0.34.0 h1:YoWv5r7bsBfb0Hs2jh8SOvFbKzzxyNo0nSb0zC19KZo= -k8s.io/client-go v0.34.0/go.mod h1:ozgMnEKXkRjeMvBZdV1AijMHLTh3pbACPvK7zFR+QQY= +k8s.io/api v0.34.1 h1:jC+153630BMdlFukegoEL8E/yT7aLyQkIVuwhmwDgJM= +k8s.io/api v0.34.1/go.mod h1:SB80FxFtXn5/gwzCoN6QCtPD7Vbu5w2n1S0J5gFfTYk= +k8s.io/apiextensions-apiserver v0.34.1 h1:NNPBva8FNAPt1iSVwIE0FsdrVriRXMsaWFMqJbII2CI= +k8s.io/apiextensions-apiserver v0.34.1/go.mod h1:hP9Rld3zF5Ay2Of3BeEpLAToP+l4s5UlxiHfqRaRcMc= +k8s.io/apimachinery v0.34.1 h1:dTlxFls/eikpJxmAC7MVE8oOeP1zryV7iRyIjB0gky4= +k8s.io/apimachinery v0.34.1/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/client-go v0.34.1 h1:ZUPJKgXsnKwVwmKKdPfw4tB58+7/Ik3CrjOEhsiZ7mY= +k8s.io/client-go v0.34.1/go.mod h1:kA8v0FP+tk6sZA0yKLRG67LWjqufAoSHA2xVGKw9Of8= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA= diff --git a/go.mod b/go.mod index 36bce9c9e5..4d998fe2fc 100644 --- a/go.mod +++ b/go.mod @@ -21,11 +21,11 @@ require ( golang.org/x/sys v0.31.0 gomodules.xyz/jsonpatch/v2 v2.4.0 gopkg.in/evanphx/json-patch.v4 v4.12.0 // Using v4 to match upstream - k8s.io/api v0.34.0 - k8s.io/apiextensions-apiserver v0.34.0 - k8s.io/apimachinery v0.34.0 - k8s.io/apiserver v0.34.0 - k8s.io/client-go v0.34.0 + k8s.io/api v0.34.1 + k8s.io/apiextensions-apiserver v0.34.1 + k8s.io/apimachinery v0.34.1 + k8s.io/apiserver v0.34.1 + k8s.io/client-go v0.34.1 k8s.io/klog/v2 v2.130.1 k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 sigs.k8s.io/structured-merge-diff/v6 v6.3.0 @@ -95,7 +95,7 @@ require ( google.golang.org/protobuf v1.36.5 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/component-base v0.34.0 // indirect + k8s.io/component-base v0.34.1 // indirect k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect diff --git a/go.sum b/go.sum index 102a137d04..d6278d8a7d 100644 --- a/go.sum +++ b/go.sum @@ -229,18 +229,18 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.34.0 h1:L+JtP2wDbEYPUeNGbeSa/5GwFtIA662EmT2YSLOkAVE= -k8s.io/api v0.34.0/go.mod h1:YzgkIzOOlhl9uwWCZNqpw6RJy9L2FK4dlJeayUoydug= -k8s.io/apiextensions-apiserver v0.34.0 h1:B3hiB32jV7BcyKcMU5fDaDxk882YrJ1KU+ZSkA9Qxoc= -k8s.io/apiextensions-apiserver v0.34.0/go.mod h1:hLI4GxE1BDBy9adJKxUxCEHBGZtGfIg98Q+JmTD7+g0= -k8s.io/apimachinery v0.34.0 h1:eR1WO5fo0HyoQZt1wdISpFDffnWOvFLOOeJ7MgIv4z0= -k8s.io/apimachinery v0.34.0/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= -k8s.io/apiserver v0.34.0 h1:Z51fw1iGMqN7uJ1kEaynf2Aec1Y774PqU+FVWCFV3Jg= -k8s.io/apiserver v0.34.0/go.mod h1:52ti5YhxAvewmmpVRqlASvaqxt0gKJxvCeW7ZrwgazQ= -k8s.io/client-go v0.34.0 h1:YoWv5r7bsBfb0Hs2jh8SOvFbKzzxyNo0nSb0zC19KZo= -k8s.io/client-go v0.34.0/go.mod h1:ozgMnEKXkRjeMvBZdV1AijMHLTh3pbACPvK7zFR+QQY= -k8s.io/component-base v0.34.0 h1:bS8Ua3zlJzapklsB1dZgjEJuJEeHjj8yTu1gxE2zQX8= -k8s.io/component-base v0.34.0/go.mod h1:RSCqUdvIjjrEm81epPcjQ/DS+49fADvGSCkIP3IC6vg= +k8s.io/api v0.34.1 h1:jC+153630BMdlFukegoEL8E/yT7aLyQkIVuwhmwDgJM= +k8s.io/api v0.34.1/go.mod h1:SB80FxFtXn5/gwzCoN6QCtPD7Vbu5w2n1S0J5gFfTYk= +k8s.io/apiextensions-apiserver v0.34.1 h1:NNPBva8FNAPt1iSVwIE0FsdrVriRXMsaWFMqJbII2CI= +k8s.io/apiextensions-apiserver v0.34.1/go.mod h1:hP9Rld3zF5Ay2Of3BeEpLAToP+l4s5UlxiHfqRaRcMc= +k8s.io/apimachinery v0.34.1 h1:dTlxFls/eikpJxmAC7MVE8oOeP1zryV7iRyIjB0gky4= +k8s.io/apimachinery v0.34.1/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/apiserver v0.34.1 h1:U3JBGdgANK3dfFcyknWde1G6X1F4bg7PXuvlqt8lITA= +k8s.io/apiserver v0.34.1/go.mod h1:eOOc9nrVqlBI1AFCvVzsob0OxtPZUCPiUJL45JOTBG0= +k8s.io/client-go v0.34.1 h1:ZUPJKgXsnKwVwmKKdPfw4tB58+7/Ik3CrjOEhsiZ7mY= +k8s.io/client-go v0.34.1/go.mod h1:kA8v0FP+tk6sZA0yKLRG67LWjqufAoSHA2xVGKw9Of8= +k8s.io/component-base v0.34.1 h1:v7xFgG+ONhytZNFpIz5/kecwD+sUhVE6HU7qQUiRM4A= +k8s.io/component-base v0.34.1/go.mod h1:mknCpLlTSKHzAQJJnnHVKqjxR7gBeHRv0rPXA7gdtQ0= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA= diff --git a/tools/setup-envtest/go.mod b/tools/setup-envtest/go.mod index 5cb31d8bf2..15c64f8b57 100644 --- a/tools/setup-envtest/go.mod +++ b/tools/setup-envtest/go.mod @@ -10,7 +10,7 @@ require ( github.com/spf13/afero v1.12.0 github.com/spf13/pflag v1.0.6 go.uber.org/zap v1.27.0 - k8s.io/apimachinery v0.34.0 + k8s.io/apimachinery v0.34.1 sigs.k8s.io/yaml v1.6.0 ) diff --git a/tools/setup-envtest/go.sum b/tools/setup-envtest/go.sum index c9dcc6499b..dfc8e7cce2 100644 --- a/tools/setup-envtest/go.sum +++ b/tools/setup-envtest/go.sum @@ -46,7 +46,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/apimachinery v0.34.0 h1:eR1WO5fo0HyoQZt1wdISpFDffnWOvFLOOeJ7MgIv4z0= -k8s.io/apimachinery v0.34.0/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/apimachinery v0.34.1 h1:dTlxFls/eikpJxmAC7MVE8oOeP1zryV7iRyIjB0gky4= +k8s.io/apimachinery v0.34.1/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs= sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4= From d04f428ec56cd59349a7337c39d4e32e4da7a461 Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Sun, 5 Oct 2025 11:40:55 +0200 Subject: [PATCH 3/5] Don't block on Get when queue is shutdown (2nd try) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stefan Büringer buringerst@vmware.com --- pkg/controller/priorityqueue/priorityqueue.go | 15 ++++++++--- .../priorityqueue/priorityqueue_test.go | 26 +++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 49942186c0..f702600fc9 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -290,9 +290,18 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) w.notifyItemOrWaiterAdded() - item := <-w.get - - return item.Key, item.Priority, w.shutdown.Load() + select { + case <-w.done: + // Return if the queue was shutdown while we were already waiting for an item here. + // For example controller workers are continuously calling GetWithPriority and + // GetWithPriority is blocking the workers if there are no items in the queue. + // If the controller and accordingly the queue is then shut down, without this code + // branch the controller workers remain blocked here and are unable to shut down. + var zero T + return zero, 0, true + case item := <-w.get: + return item.Key, item.Priority, w.shutdown.Load() + } } func (w *priorityqueue[T]) Get() (item T, shutdown bool) { diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 13cf59b7e8..884844efab 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -321,6 +321,32 @@ var _ = Describe("Controllerworkqueue", func() { Expect(isShutDown).To(BeTrue()) }) + It("Get from priority queue should get unblocked when the priority queue is shut down", func() { + q, _ := newQueue() + + getUnblocked := make(chan struct{}) + + go func() { + defer GinkgoRecover() + defer close(getUnblocked) + + item, priority, isShutDown := q.GetWithPriority() + Expect(item).To(Equal("")) + Expect(priority).To(Equal(0)) + Expect(isShutDown).To(BeTrue()) + }() + + // Verify the go routine above is now waiting for an item. + Eventually(q.waiters.Load).Should(Equal(int64(1))) + Consistently(getUnblocked).ShouldNot(BeClosed()) + + // shut down + q.ShutDown() + + // Verify the shutdown unblocked the go routine. + Eventually(getUnblocked).Should(BeClosed()) + }) + It("items are included in Len() and the queueDepth metric once they are ready", func() { q, metrics := newQueue() defer q.ShutDown() From 6d368ce0f7e7218c8e1ce8ddceff43354760d535 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sun, 5 Oct 2025 13:44:02 -0400 Subject: [PATCH 4/5] Rebase priorityqueue shutdown fix for release-0.22 --- pkg/controller/priorityqueue/priorityqueue_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 884844efab..6127cd99ba 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -337,7 +337,7 @@ var _ = Describe("Controllerworkqueue", func() { }() // Verify the go routine above is now waiting for an item. - Eventually(q.waiters.Load).Should(Equal(int64(1))) + Eventually(q.(*priorityqueue[string]).waiters.Load).Should(Equal(int64(1))) Consistently(getUnblocked).ShouldNot(BeClosed()) // shut down From 7fb34b509fcf92c7e775261c4ea1999fbace5463 Mon Sep 17 00:00:00 2001 From: k8s-infra-cherrypick-robot <90416843+k8s-infra-cherrypick-robot@users.noreply.github.com> Date: Mon, 6 Oct 2025 09:02:59 -0700 Subject: [PATCH 5/5] =?UTF-8?q?[release-0.22]=20=F0=9F=90=9B=20Fix=20a=20b?= =?UTF-8?q?ug=20where=20the=20priorityqueue=20would=20sometimes=20not=20re?= =?UTF-8?q?turn=20high-priority=20items=20first=20(#3340)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: adjust priority queue order and spin Co-authored-by: kstiehl * fix: do not hand out item during metrics ascend Co-authored-by: kstiehl * test: add test case Co-authored-by: kstiehl * rm async from test * rm metricsAscend flag * fix test Co-authored-by: kstiehl * add comments Co-authored-by: kstiehl * Update pkg/controller/priorityqueue/priorityqueue.go Co-authored-by: Alvaro Aleman * Rebase for releasebranch without newQueueWithTimeForwarder --------- Co-authored-by: moritzmoe <51033452+moritzmoe@users.noreply.github.com> Co-authored-by: kstiehl Co-authored-by: Alvaro Aleman --- pkg/controller/priorityqueue/priorityqueue.go | 95 ++++++++++++------- .../priorityqueue/priorityqueue_test.go | 41 ++++++++ 2 files changed, 104 insertions(+), 32 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index f702600fc9..98df84c56b 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -1,6 +1,7 @@ package priorityqueue import ( + "math" "sync" "sync/atomic" "time" @@ -206,6 +207,7 @@ func (w *priorityqueue[T]) spin() { blockForever := make(chan time.Time) var nextReady <-chan time.Time nextReady = blockForever + var nextItemReadyAt time.Time for { select { @@ -213,10 +215,10 @@ func (w *priorityqueue[T]) spin() { return case <-w.itemOrWaiterAdded: case <-nextReady: + nextReady = blockForever + nextItemReadyAt = time.Time{} } - nextReady = blockForever - func() { w.lock.Lock() defer w.lock.Unlock() @@ -227,39 +229,67 @@ func (w *priorityqueue[T]) spin() { // manipulating the tree from within Ascend might lead to panics, so // track what we want to delete and do it after we are done ascending. var toDelete []*item[T] - w.queue.Ascend(func(item *item[T]) bool { - if item.ReadyAt != nil { - if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { - nextReady = w.tick(readyAt) - return false + + var key T + + // Items in the queue tree are sorted first by priority and second by readiness, so + // items with a lower priority might be ready further down in the queue. + // We iterate through the priorities high to low until we find a ready item + pivot := item[T]{ + Key: key, + AddedCounter: 0, + Priority: math.MaxInt, + ReadyAt: nil, + } + + for { + pivotChange := false + + w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool { + // Item is locked, we can not hand it out + if w.locked.Has(item.Key) { + return true } - if !w.becameReady.Has(item.Key) { - w.metrics.add(item.Key, item.Priority) - w.becameReady.Insert(item.Key) + + if item.ReadyAt != nil { + if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { + if nextItemReadyAt.After(*item.ReadyAt) || nextItemReadyAt.IsZero() { + nextReady = w.tick(readyAt) + nextItemReadyAt = *item.ReadyAt + } + + // Adjusting the pivot item moves the ascend to the next lower priority + pivot.Priority = item.Priority - 1 + pivotChange = true + return false + } + if !w.becameReady.Has(item.Key) { + w.metrics.add(item.Key, item.Priority) + w.becameReady.Insert(item.Key) + } } - } - if w.waiters.Load() == 0 { - // Have to keep iterating here to ensure we update metrics - // for further items that became ready and set nextReady. - return true - } + if w.waiters.Load() == 0 { + // Have to keep iterating here to ensure we update metrics + // for further items that became ready and set nextReady. + return true + } - // Item is locked, we can not hand it out - if w.locked.Has(item.Key) { - return true - } + w.metrics.get(item.Key, item.Priority) + w.locked.Insert(item.Key) + w.waiters.Add(-1) + delete(w.items, item.Key) + toDelete = append(toDelete, item) + w.becameReady.Delete(item.Key) + w.get <- *item - w.metrics.get(item.Key, item.Priority) - w.locked.Insert(item.Key) - w.waiters.Add(-1) - delete(w.items, item.Key) - toDelete = append(toDelete, item) - w.becameReady.Delete(item.Key) - w.get <- *item + return true + }) - return true - }) + if !pivotChange { + break + } + } for _, item := range toDelete { w.queue.Delete(item) @@ -387,6 +417,9 @@ func (w *priorityqueue[T]) logState() { } func less[T comparable](a, b *item[T]) bool { + if a.Priority != b.Priority { + return a.Priority > b.Priority + } if a.ReadyAt == nil && b.ReadyAt != nil { return true } @@ -396,9 +429,6 @@ func less[T comparable](a, b *item[T]) bool { if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) { return a.ReadyAt.Before(*b.ReadyAt) } - if a.Priority != b.Priority { - return a.Priority > b.Priority - } return a.AddedCounter < b.AddedCounter } @@ -426,4 +456,5 @@ type bTree[T any] interface { ReplaceOrInsert(item T) (_ T, _ bool) Delete(item T) (T, bool) Ascend(iterator btree.ItemIteratorG[T]) + AscendGreaterOrEqual(pivot T, iterator btree.ItemIteratorG[T]) } diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 6127cd99ba..d0cc51f7c5 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -197,6 +197,47 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.retries["test"]).To(Equal(1)) }) + It("returns high priority item that became ready before low priority item", func() { + q, metrics := newQueue() + defer q.ShutDown() + + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + cwq := q.(*priorityqueue[string]) + cwq.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + tickSetup := make(chan any) + cwq.tick = func(d time.Duration) <-chan time.Time { + Expect(d).To(Equal(time.Second)) + close(tickSetup) + return tick + } + + lowPriority := -100 + highPriority := 0 + q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "foo") + q.AddWithOpts(AddOpts{After: time.Second, Priority: &highPriority}, "prio") + + Eventually(tickSetup).Should(BeClosed()) + + nowLock.Lock() + now = now.Add(time.Second) + nowLock.Unlock() + tick <- now + key, prio, _ := q.GetWithPriority() + + Expect(key).To(Equal("prio")) + Expect(prio).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{-100: 1, 0: 0})) + Expect(metrics.adds["test"]).To(Equal(2)) + Expect(metrics.retries["test"]).To(Equal(1)) + }) + It("returns an item to a waiter as soon as it has one", func() { q, metrics := newQueue() defer q.ShutDown()