8000 [GarbageCollector] only store typeMeta and objectMeta in the gc store by caesarxuchao · Pull Request #28480 · kubernetes/kubernetes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
"k8s.io/kubernetes/pkg/controller/job"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
Expand All @@ -69,6 +70,7 @@ import (
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/crypto"
Expand Down Expand Up @@ -495,8 +497,12 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
}
clientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "generic-garbage-collector"), dynamic.LegacyAPIPathResolverFunc)
garbageCollector, err := garbagecollector.NewGarbageCollector(clientPool, groupVersionResources)
config := restclient.AddUserAgent(kubeconfig, "generic-garbage-collector")
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, groupVersionResources)
if err != nil {
glog.Errorf("Failed to start the generic garbage collector: %v", err)
} else {
Expand Down
4 changes: 4 additions & 0 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,7 @@ func TestDeleteWithOptions(t *testing.T) {
if simpleStorage.deleted != ID {
t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID)
}
simpleStorage.deleteOptions.GetObjectKind().SetGroupVersionKind(unversioned.GroupVersionKind{})
Copy link
Contributor Author
@caesarxuchao caesarxuchao Aug 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are needed because I change the UseOrCreateObject. The decoder uses the passed in into object instead of creating a new one. In turn the versioning decoder skips the conversion so the gvk is kept. I think this behavior is an innocuous bug.

if !api.Semantic.DeepEqual(simpleStorage.deleteOptions, item) {
t.Errorf("unexpected delete options: %s", diff.ObjectDiff(simpleStorage.deleteOptions, item))
}
Expand Down Expand Up @@ -2700,6 +2701,7 @@ func TestCreate(t *testing.T) {
t.Errorf("unexpected error: %v %#v", err, response)
}

itemOut.GetObjectKind().SetGroupVersionKind(unversioned.GroupVersionKind{})
if !reflect.DeepEqual(&itemOut, simple) {
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body))
}
Expand Down Expand Up @@ -2769,6 +2771,7 @@ func TestCreateYAML(t *testing.T) {
t.Fatalf("unexpected error: %v %#v", err, response)
}

itemOut.GetObjectKind().SetGroupVersionKind(unversioned.GroupVersionKind{})
if !reflect.DeepEqual(&itemOut, simple) {
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body))
}
Expand Down Expand Up @@ -2827,6 +2830,7 @@ func TestCreateInNamespace(t *testing.T) {
t.Fatalf("unexpected error: %v\n%s", err, data)
}

itemOut.GetObjectKind().SetGroupVersionKind(unversioned.GroupVersionKind{})
if !reflect.DeepEqual(&itemOut, simple) {
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body))
}
Expand Down
16 changes: 7 additions & 9 deletions pkg/client/typed/dynamic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,9 @@ func NewClient(conf *restclient.Config) (*Client, error) {
confCopy := *conf
conf = &confCopy

codec := dynamicCodec{}

// TODO: it's questionable that this should be using anything other than unstructured schema and JSON
conf.ContentType = runtime.ContentTypeJSON
conf.AcceptContentTypes = runtime.ContentTypeJSON
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}, streamingInfo)

if conf.APIPath == "" {
conf.APIPath = "/api"
Expand All @@ -69,6 +65,10 @@ func NewClient(conf *restclient.Config) (*Client, error) {
if len(conf.UserAgent) == 0 {
conf.UserAgent = restclient.DefaultKubernetesUserAgent()
}
if conf.NegotiatedSerializer == nil {
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: dynamicCodec{}}, streamingInfo)
}

cl, err := restclient.RESTClientFor(conf)
if err != nil {
Expand Down Expand Up @@ -119,19 +119,17 @@ type ResourceClient struct {
}

// List returns a list of objects for this resource.
func (rc *ResourceClient) List(opts runtime.Object) (*runtime.UnstructuredList, error) {
result := new(runtime.UnstructuredList)
func (rc *ResourceClient) List(opts runtime.Object) (runtime.Object, error) {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
err := rc.cl.Get().
return rc.cl.Get().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(opts, parameterEncoder).
Do().
Into(result)
return result, err
Get()
}

// Get gets the resource with the specified name.
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/typed/dynamic/client_pool.go
"k8s.io/kubernetes/pkg/api"
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package dynamic
import (
"sync"

"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer"
)

// ClientPool manages a pool of dynamic clients.
Expand Down Expand Up @@ -77,6 +80,12 @@ func (c *clientPoolImpl) ClientForGroupVersion(groupVersion unversioned.GroupVer

// we need to make a client
conf.GroupVersion = &groupVersion

if conf.NegotiatedSerializer == nil {
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: dynamicCodec{}}, streamingInfo)
}

dynamicClient, err := NewClient(conf)
if err != nil {
return nil, err
Expand Down
68 changes: 55 additions & 13 deletions pkg/controller/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
Expand Down Expand Up @@ -154,6 +155,7 @@ func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerRefer
dependentsLock: &sync.RWMutex{},
dependents: make(map[*node]struct{}),
}
glog.V(6).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
p.uidToNode.Write(ownerNode)
p.gc.dirtyQueue.Add(ownerNode)
}
Expand Down Expand Up @@ -426,16 +428,19 @@ func (p *Propagator) processEvent() {
// removing ownerReferences from the dependents if the owner is deleted with
// DeleteOptions.OrphanDependents=true.
type GarbageCollector struct {
restMapper meta.RESTMapper
restMapper meta.RESTMapper
// metaOnlyClientPool uses a special codec, which removes fields except for
// apiVersion, kind, and metadata during decoding.
metaOnlyClientPool dynamic.ClientPool
// clientPool uses the regular dynamicCodec. We need it to update
// finalizers. It can be removed if we support patching finalizers.
clientPool dynamic.ClientPool
dirtyQueue *workqueue.Type
orphanQueue *workqueue.Type
monitors []monitor
propagator *Propagator
}

// TODO: make special List and Watch function that removes fields other than
// ObjectMeta.
func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
Expand All @@ -461,28 +466,38 @@ func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionReso
}
}

func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource) (monitor, error) {
func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource, kind unversioned.GroupVersionKind) (monitor, error) {
// TODO: consider store in one storage.
glog.V(6).Infof("create storage for resource %s", resource)
var monitor monitor
client, err := clientPool.ClientForGroupVersion(resource.GroupVersion())
client, err := p.gc.metaOnlyClientPool.ClientForGroupVersion(resource.GroupVersion())
if err != nil {
return monitor, err
}
setObjectTypeMeta := func(obj interface{}) {
runtimeObject, ok := obj.(runtime.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("expected runtime.Object, got %#v", obj))
}
runtimeObject.GetObjectKind().SetGroupVersionKind(kind)
}
monitor.store, monitor.controller = framework.NewInformer(
gcListWatcher(client, resource),
nil,
ResourceResyncTime,
framework.ResourceEventHandlerFuncs{
// add the event to the propagator's eventQueue.
AddFunc: func(obj interface{}) {
setObjectTypeMeta(obj)
event := event{
eventType: addEvent,
obj: obj,
}
p.eventQueue.Add(event)
},
UpdateFunc: func(oldObj, newObj interface{}) {
setObjectTypeMeta(newObj)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest just making your new codec do this step?

Copy link
Contributor Author
@caesarxuchao caesarxuchao Aug 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a codec. One benefit of registering the MetaOnly to the scheme is utilizing the existing codecs, like the DirectCodec. I think it's neater to do it here unless we see other use cases then we can put a generalized codec. [edit] Also our a List doesn't set the gvk of items, we would need special decoder to workaround that as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chao convinced me IRL :(

On Thu, Aug 4, 2016 at 2:26 PM, Chao Xu notifications@github.com wrote:

In pkg/controller/garbagecollector/garbagecollector.go
#28480 (comment)
:

            event := event{
                eventType: addEvent,
                obj:       obj,
            }
            p.eventQueue.Add(event)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
  •           setObjectTypeMeta(newObj)
    

I don't have a codec. One benefit of registering the MetaOnly to the
scheme is utilizing the existing codecs, like the DirectCodec. I think it's
neater to do it here unless we see other use cases then we can put a
generalized codec.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/kubernetes/kubernetes/pull/28480/files/41d92b1b3fe4883311ae3037fb816f858d565fbd#r73605893,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAngltTGxnes6BT_iSGGuyLD-HbyUuwZks5qclkWgaJpZM4JEvyT
.

setObjectTypeMeta(oldObj)
event := event{updateEvent, newObj, oldObj}
p.eventQueue.Add(event)
},
Expand All @@ -491,6 +506,7 @@ func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversion
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = deletedFinalStateUnknown.Obj
}
setObjectTypeMeta(obj)
event := event{
eventType: deleteEvent,
obj: obj,
Expand All @@ -511,11 +527,12 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{
unversioned.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "subjectaccessreviews"}: {},
}

func NewGarbageCollector(clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
gc := &GarbageCollector{
clientPool: clientPool,
dirtyQueue: workqueue.New(),
orphanQueue: workqueue.New(),
metaOnlyClientPool: metaOnlyClientPool,
clientPool: clientPool,
dirtyQueue: workqueue.New(),
orphanQueue: workqueue.New(),
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper: registered.RESTMapper(),
}
Expand All @@ -532,7 +549,11 @@ func NewGarbageCollector(clientPool dynamic.ClientPool, resources []unversioned.
glog.V(6).Infof("ignore resource %#v", resource)
continue
}
monitor, err := monitorFor(gc.propagator, gc.clientPool, resource)
kind, err := gc.restMapper.KindFor(resource)
if err != nil {
return nil, err
}
monitor, err := monitorFor(gc.propagator, gc.clientPool, resource, kind)
if err != nil {
return nil, err
}
Expand All @@ -549,7 +570,7 @@ func (gc *GarbageCollector) worker() {
defer gc.dirtyQueue.Done(key)
err := gc.processItem(key.(*node))
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing item %v: %v", key, err))
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", key, err))
}
}

Expand Down Expand Up @@ -623,6 +644,20 @@ func objectReferenceToUnstructured(ref objectReference) *runtime.Unstructured {
return ret
}

func objectReferenceToMetadataOnlyObject(ref objectReference) *metaonly.MetadataOnlyObject {
return &metaonly.MetadataOnlyObject{
TypeMeta: unversioned.TypeMeta{
APIVersion: ref.APIVersion,
Kind: ref.Kind,
},
ObjectMeta: v1.ObjectMeta{
Namespace: ref.Namespace,
UID: ref.UID,
Name: ref.Name,
},
}
}

func (gc *GarbageCollector) processItem(item *node) error {
// G 684D et the latest item from the API server
latest, err := gc.getObject(item.identity)
Expand All @@ -634,15 +669,22 @@ func (gc *GarbageCollector) processItem(item *node) error {
glog.V(6).Infof("item %v not found, generating a virtual delete event", item.identity)
event := event{
eventType: deleteEvent,
obj: objectReferenceToUnstructured(item.identity),
obj: objectReferenceToMetadataOnlyObject(item.identity),
}
glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
gc.propagator.eventQueue.Add(event)
return nil
}
return err
}
if latest.GetUID() != item.identity.UID {
glog.V(6).Infof("UID doesn't match, item %v not found, ignore it", item.identity)
glog.V(6).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity)
event := event{
eventType: deleteEvent,
obj: objectReferenceToMetadataOnlyObject(item.identity),
}
glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
gc.propagator.eventQueue.Add(event)
return nil
}
ownerReferences := latest.GetOwnerReferences()
Expand Down
23 changes: 18 additions & 5 deletions pkg/controller/garbagecollector/garbagecollector_test.go
65CE
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"

_ "k8s.io/kubernetes/pkg/api/install"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
"k8s.io/kubernetes/pkg/runtime/serializer"

"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
Expand All @@ -39,9 +41,13 @@ import (
)

func TestNewGarbageCollector(t *testing.T) {
clientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc)
config := &restclient.Config{}
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
gc, err := NewGarbageCollector(clientPool, podResource)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -141,8 +147,11 @@ func TestProcessItem(t *testing.T) {
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
clientConfig.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
clientConfig.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
gc, err := NewGarbageCollector(clientPool, podResource)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -293,9 +302,13 @@ func TestProcessEvent(t *testing.T) {
// TestDependentsRace relies on golang's data race detector to check if there is
// data race among in the dependents field.
func TestDependentsRace(t *testing.T) {
clientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc)
config := &restclient.Config{}
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
gc, err := NewGarbageCollector(clientPool, podResource)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading
0