8000 use runtime scheme · kubernetes/kubernetes@41d92b1 · GitHub
[go: up one dir, main page]

Skip to content

Commit 41d92b1

Browse files
author
Chao Xu
committed
use runtime scheme
1 parent 62111f1 commit 41d92b1

File tree

20 files changed

+993
-178
lines changed

20 files changed

+993
-178
lines changed

cmd/kube-controller-manager/app/controllermanager.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ import (
7171
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
7272
"k8s.io/kubernetes/pkg/healthz"
7373
quotainstall "k8s.io/kubernetes/pkg/quota/install"
74+
"k8s.io/kubernetes/pkg/runtime/serializer"
7475
"k8s.io/kubernetes/pkg/serviceaccount"
7576
"k8s.io/kubernetes/pkg/util/configz"
7677
"k8s.io/kubernetes/pkg/util/crypto"
@@ -488,9 +489,9 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
488489
if err != nil {
489490
glog.Fatalf("Failed to get supported resources from server: %v", err)
490491
}
491-
compressingClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "generic-garbage-collector"), dynamic.LegacyAPIPathResolverFunc, metaonly.MetaOnlyJSONScheme)
492+
metaOnlyClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "generic-garbage-collector"), dynamic.LegacyAPIPathResolverFunc, serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetaOnlyCodecFactory()})
492493
clientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "generic-garbage-collector"), dynamic.LegacyAPIPathResolverFunc, nil)
493-
garbageCollector, err := garbagecollector.NewGarbageCollector(compressingClientPool, clientPool, groupVersionResources)
494+
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, groupVersionResources)
494495
if err != nil {
495496
glog.Errorf("Failed to start the generic garbage collector: %v", err)
496497
} else {

pkg/apiserver/apiserver_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1911,6 +1911,7 @@ func TestDeleteWithOptions(t *testing.T) {
19111911
if simpleStorage.deleted != ID {
19121912
t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID)
19131913
}
1914+
simpleStorage.deleteOptions.GetObjectKind().SetGroupVersionKind(unversioned.GroupVersionKind{})
19141915
if !api.Semantic.DeepEqual(simpleStorage.deleteOptions, item) {
19151916
t.Errorf("unexpected delete options: %s", diff.ObjectDiff(simpleStorage.deleteOptions, item))
19161917
}
@@ -2700,6 +2701,7 @@ func TestCreate(t *testing.T) {
27002701
t.Errorf("unexpected error: %v %#v", err, response)
27012702
}
27022703

2704+
itemOut.GetObjectKind().SetGroupVersionKind(unversioned.GroupVersionKind{})
27032705
if !reflect.DeepEqual(&itemOut, simple) {
27042706
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body))
27052707
}
@@ -2769,6 +2771,7 @@ func TestCreateYAML(t *testing.T) {
27692771
t.Fatalf("unexpected error: %v %#v", err, response)
27702772
}
27712773

2774+
itemOut.GetObjectKind().SetGroupVersionKind(unversioned.GroupVersionKind{})
27722775
if !reflect.DeepEqual(&itemOut, simple) {
27732776
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body))
27742777
}
@@ -2827,6 +2830,7 @@ func TestCreateInNamespace(t *testing.T) {
28272830
t.Fatalf("unexpected error: %v\n%s", err, data)
28282831
}
28292832

2833+
itemOut.GetObjectKind().SetGroupVersionKind(unversioned.GroupVersionKind{})
28302834
if !reflect.DeepEqual(&itemOut, simple) {
28312835
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body))
28322836
}

pkg/client/restclient/request.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,7 @@ func (r Result) Into(obj runtime.Object) error {
10901090
return runtime.DecodeInto(r.decoder, r.body, obj)
10911091
}
10921092

1093-
// Decode decodes the result.
1093+
// Decode decodes the result to a runtime.Object as determined by the decoder.
10941094
func (r Result) Decode() (obj runtime.Object, gvk *unversioned.GroupVersionKind, err error) {
10951095
if r.err != nil {
10961096
return nil, nil, r.err

pkg/client/typed/dynamic/client.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func NewClient(conf *restclient.Config) (*Client, error) {
5656

5757
// TODO: it's questionable that this should be using anything other than unstructured schema and JSON
5858
conf.ContentType = runtime.ContentTypeJSON
59+
conf.AcceptContentTypes = runtime.ContentTypeJSON
5960

6061
if conf.APIPath == "" {
6162
conf.APIPath = "/api"
@@ -66,7 +67,7 @@ func NewClient(conf *restclient.Config) (*Client, error) {
6667
}
6768
if conf.NegotiatedSerializer == nil {
6869
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
69-
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: Codec{}}, streamingInfo)
70+
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: dynamicCodec{}}, streamingInfo)
7071
}
7172

7273
cl, err := restclient.RESTClientFor(conf)
@@ -224,11 +225,11 @@ func (rc *ResourceClient) Patch(name string, pt api.PatchType, data []byte) (*ru
224225
return result, err
225226
}
226227

227-
// Codec is a codec that wraps the standard unstructured codec
228+
// dynamicCodec is a codec that wraps the standard unstructured codec
228229
// with special handling for Status objects.
229-
type Codec struct{}
230+
type dynamicCodec struct{}
230231

231-
func (Codec) Decode(data []byte, gvk *unversioned.GroupVersionKind, obj runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
232+
func (dynamicCodec) Decode(data []byte, gvk *unversioned.GroupVersionKind, obj runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
232233
obj, gvk, err := runtime.UnstructuredJSONScheme.Decode(data, gvk, obj)
233234
if err != nil {
234235
return nil, nil, err
@@ -245,7 +246,7 @@ func (Codec) Decode(data []byte, gvk *unversioned.GroupVersionKind, obj runtime.
245246
return obj, gvk, nil
246247
}
247248

248-
func (Codec) Encode(obj runtime.Object, w io.Writer) error {
249+
func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error {
249250
return runtime.UnstructuredJSONScheme.Encode(obj, w)
250251
}
251252

pkg/client/typed/dynamic/client_pool.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,21 +45,21 @@ func LegacyAPIPathResolverFunc(groupVersion unversioned.GroupVersion) string {
4545

4646
// clientPoolImpl implements Factory
4747
type clientPoolImpl struct {
48-
lock sync.RWMutex
49-
config *restclient.Config
50-
clients map[unversion AB46 ed.GroupVersion]*Client
51-
apiPathResolverFunc APIPathResolverFunc
52-
codec runtime.Codec
48+
lock sync.RWMutex
49+
config *restclient.Config
50+
clients map[unversioned.GroupVersion]*Client
51+
apiPathResolverFunc APIPathResolverFunc
52+
negotiatedSerializer runtime.NegotiatedSerializer
5353
}
5454

5555
// NewClientPool returns a ClientPool from the specified config
56-
func NewClientPool(config *restclient.Config, apiPathResolverFunc APIPathResolverFunc, codec runtime.Codec) ClientPool {
56+
func NewClientPool(config *restclient.Config, apiPathResolverFunc APIPathResolverFunc, negotiatedSerializer runtime.NegotiatedSerializer) ClientPool {
5757
confCopy := *config
5858
return &clientPoolImpl{
59-
config: &confCopy,
60-
clients: map[unversioned.GroupVersion]*Client{},
61-
apiPathResolverFunc: apiPathResolverFunc,
62-
codec: codec,
59+
config: &confCopy,
60+
clients: map[unversioned.GroupVersion]*Client{},
61+
apiPathResolverFunc: apiPathResolverFunc,
62+
negotiatedSerializer: negotiatedSerializer,
6363
}
6464
}
6565

@@ -83,15 +83,12 @@ func (c *clientPoolImpl) ClientForGroupVersion(groupVersion unversioned.GroupVer
8383
// we need to make a client
8484
conf.GroupVersion = &groupVersion
8585

86-
var codec runtime.Codec
87-
if c.codec != nil {
88-
codec = c.codec
86+
if c.negotiatedSerializer != nil {
87+
conf.NegotiatedSerializer = c.negotiatedSerializer
8988
} else {
90-
codec = Codec{}
89+
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
90+
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: dynamicCodec{}}, streamingInfo)
9191
}
92-
conf.AcceptContentTypes = runtime.ContentTypeJSON
93-
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
94-
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}, streamingInfo)
9592

9693
dynamicClient, err := NewClient(conf)
9794
if err != nil {

pkg/client/typed/dynamic/client_test.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"k8s.io/kubernetes/pkg/api/v1"
3131
"k8s.io/kubernetes/pkg/client/restclient"
3232
"k8s.io/kubernetes/pkg/runtime"
33-
"k8s.io/kubernetes/pkg/runtime/serializer"
3433
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
3534
"k8s.io/kubernetes/pkg/watch"
3635
"k8s.io/kubernetes/pkg/watch/versioned"
@@ -60,14 +59,9 @@ func getObject(version, kind, name string) *runtime.Unstructured {
6059

6160
func getClientServer(gv *unversioned.GroupVersion, h func(http.ResponseWriter, *http.Request)) (*Client, *httptest.Server, error) {
6261
srv := httptest.NewServer(http.HandlerFunc(h))
63-
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
64-
negotiatedSerializer := serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: Codec{}}, streamingInfo)
6562
cl, err := NewClient(&restclient.Config{
66-
Host: srv.URL,
67-
ContentConfig: restclient.ContentConfig{
68-
GroupVersion: gv,
69-
NegotiatedSerializer: negotiatedSerializer,
70-
},
63+
Host: srv.URL,
64+
ContentConfig: restclient.ContentConfig{GroupVersion: gv},
7165
})
7266
if err != nil {
7367
srv.Close()
@@ -463,7 +457,7 @@ func TestWatch(t *testing.T) {
463457
t.Errorf("Watch(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
464458
}
465459

466-
enc := versioned.NewEncoder(streaming.NewEncoder(w, Codec{}), Codec{})
460+
enc := versioned.NewEncoder(streaming.NewEncoder(w, dynamicCodec{}), dynamicCodec{})
467461
for _, e := range tc.events {
468462
enc.Encode(&e)
469463
}

pkg/controller/garbagecollector/garbagecollector.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,10 @@ func (p *Propagator) processEvent() {
429429
// DeleteOptions.OrphanDependents=true.
430430
type GarbageCollector struct {
431431
restMapper meta.RESTMapper
432-
// compressingClientPool uses the compressing codec, which removes fields
433-
// except for apiVersion, kind, and metadata when decode.
434-
compressingClientPool dynamic.ClientPool
435-
// clientPool uses the regular dynamic.Codec. We need it to update
432+
// metaOnlyClientPool uses a special codec, which removes fields except for
433+
// apiVersion, kind, and metadata during decoding.
434+
metaOnlyClientPool dynamic.ClientPool
435+
// clientPool uses the regular dynamicCodec. We need it to update
436436
// finalizers. It can be removed if we support patching finalizers.
437437
clientPool dynamic.ClientPool
438438
dirtyQueue *workqueue.Type
@@ -466,28 +466,38 @@ func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionReso
466466
}
467467
}
468468

469-
func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource) (monitor, error) {
469+
func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource, kind unversioned.GroupVersionKind) (monitor, error) {
470470
// TODO: consider store in one storage.
471471
glog.V(6).Infof("create storage for resource %s", resource)
472472
var monitor monitor
473-
client, err := p.gc.compressingClientPool.ClientForGroupVersion(resource.GroupVersion())
473+
client, err := p.gc.metaOnlyClientPool.ClientForGroupVersion(resource.GroupVersion())
474474
if err != nil {
475475
return monitor, err
476476
}
477+
setObjectTypeMeta := func(obj interface{}) {
478+
runtimeObject, ok := obj.(runtime.Object)
479+
if !ok {
480+
utilruntime.HandleError(fmt.Errorf("expected runtime.Object, got %#v", obj))
481+
}
482+
runtimeObject.GetObjectKind().SetGroupVersionKind(kind)
483+
}
477484
monitor.store, monitor.controller = framework.NewInformer(
478485
gcListWatcher(client, resource),
479486
nil,
480487
ResourceResyncTime,
481488
framework.ResourceEventHandlerFuncs{
482489
// add the event to the propagator's eventQueue.
483490
AddFunc: func(obj interface{}) {
491+
setObjectTypeMeta(obj)
484492
event := event{
485493
eventType: addEvent,
486494
obj: obj,
487495
}
488496
p.eventQueue.Add(event)
489497
},
490498
UpdateFunc: func(oldObj, newObj interface{}) {
499+
setObjectTypeMeta(newObj)
500+
setObjectTypeMeta(oldObj)
491501
event := event{updateEvent, newObj, oldObj}
492502
p.eventQueue.Add(event)
493503
},
@@ -496,6 +506,7 @@ func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversion
496506
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
497507
obj = deletedFinalStateUnknown.Obj
498508
}
509+
setObjectTypeMeta(obj)
499510
event := event{
500511
eventType: deleteEvent,
501512
obj: obj,
@@ -514,12 +525,12 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{
514525
unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "events"}: {},
515526
}
516527

517-
func NewGarbageCollector(compressingClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
528+
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
518529
gc := &GarbageCollector{
519-
compressingClientPool: compressingClientPool,
520-
clientPool: clientPool,
521-
dirtyQueue: workqueue.New(),
522-
orphanQueue: workqueue.New(),
530+
metaOnlyClientPool: metaOnlyClientPool,
531+
clientPool: clientPool,
532+
dirtyQueue: workqueue.New(),
533+
orphanQueue: workqueue.New(),
523534
// TODO: should use a dynamic RESTMapper built from the discovery results.
524535
restMapper: registered.RESTMapper(),
525536
}
@@ -536,7 +547,11 @@ func NewGarbageCollector(compressingClientPool dynamic.ClientPool, clientPool dy
536547
glog.V(6).Infof("ignore resource %#v", resource)
537548
continue
538549
}
539-
monitor, err := monitorFor(gc.propagator, gc.clientPool, resource)
550+
kind, err := gc.restMapper.KindFor(resource)
551+
if err != nil {
552+
return nil, err
553+
}
554+
monitor, err := monitorFor(gc.propagator, gc.clientPool, resource, kind)
540555
if err != nil {
541556
return nil, err
542557
}

pkg/controller/garbagecollector/garbagecollector_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
_ "k8s.io/kubernetes/pkg/api/install"
2727
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
28+
"k8s.io/kubernetes/pkg/runtime/serializer"
2829

2930
"github.com/stretchr/testify/assert"
3031
"k8s.io/kubernetes/pkg/api"
@@ -40,10 +41,10 @@ import (
4041
)
4142

4243
func TestNewGarbageCollector(t *testing.T) {
43-
compressingClientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc, metaonly.MetaOnlyJSONScheme)
44+
metaOnlyClientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc, serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetaOnlyCodecFactory()})
4445
clientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc, nil)
4546
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
46-
gc, err := NewGarbageCollector(compressingClientPool, clientPool, podResource)
47+
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
4748
if err != nil {
4849
t.Fatal(err)
4950
}
@@ -143,9 +144,9 @@ func TestProcessItem(t *testing.T) {
143144
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
144145
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
145146
defer srv.Close()
146-
compressingClientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc, metaonly.MetaOnlyJSONScheme)
147+
metaOnlyClientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc, serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetaOnlyCodecFactory()})
147148
clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc, nil)
148-
gc, err := NewGarbageCollector(compressingClientPool, clientPool, podResource)
149+
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
149150
if err != nil {
150151
t.Fatal(err)
151152
}
@@ -296,10 +297,10 @@ func TestProcessEvent(t *testing.T) {
296297
// TestDependentsRace relies on golang's data race detector to check if there is
297298
// data race among in the dependents field.
298299
func TestDependentsRace(t *testing.T) {
299-
compressingClientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc, metaonly.MetaOnlyJSONScheme)
300+
metaOnlyClientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc, serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetaOnlyCodecFactory()})
300301
clientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc, nil)
301302
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
302-
gc, err := NewGarbageCollector(compressingClientPool, clientPool, podResource)
303+
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
303304
if err != nil {
304305
t.Fatal(err)
305306
}

0 commit comments

Comments
 (0)
0