Skip to content

Commit f94ad89

Browse files
Merge pull request #2478 from rh-roman/backport-api-stream
[release-4.19] CNTRLPLANE-1609: Backport StreamingCollectionEncoding for JSON and protobuf
2 parents 9fa6215 + cfbb772 commit f94ad89

File tree

19 files changed

+2370
-34
lines changed

19 files changed

+2370
-34
lines changed

pkg/features/kube_features.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,14 @@ const (
702702
// Enables support for the StorageVersionMigrator controller.
703703
StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator"
704704

705+
// owner: @serathius
706+
// Allow API server JSON encoder to encode collections item by item, instead of all at once.
707+
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
708+
709+
// owner: serathius
710+
// Allow API server Protobuf encoder to encode collections item by item, instead of all at once.
711+
StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf"
712+
705713
// owner: @robscott
706714
// kep: https://kep.k8s.io/2433
707715
//

pkg/features/versioned_kube_features.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,14 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
756756
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
757757
},
758758

759+
StreamingCollectionEncodingToJSON: {
760+
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Beta},
761+
},
762+
763+
StreamingCollectionEncodingToProtobuf: {
764+
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Beta},
765+
},
766+
759767
SupplementalGroupsPolicy: {
760768
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
761769
},

pkg/registry/core/rest/storage_core_generic.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,18 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API
7373
ParameterCodec: legacyscheme.ParameterCodec,
7474
NegotiatedSerializer: legacyscheme.Codecs,
7575
}
76+
opts := []serializer.CodecFactoryOptionsMutator{}
7677
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
77-
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, serializer.WithSerializer(cbor.NewSerializerInfo))
78+
opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
79+
}
80+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
81+
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
82+
}
83+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
84+
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
85+
}
86+
if len(opts) != 0 {
87+
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...)
7888
}
7989

8090
eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
854854
clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped
855855

856856
// CRDs explicitly do not support protobuf, but some objects returned by the API server do
857+
streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON)
857858
negotiatedSerializer := unstructuredNegotiatedSerializer{
858859
typer: typer,
859860
creator: creator,
@@ -867,10 +868,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
867868
MediaTypeType: "application",
868869
MediaTypeSubType: "json",
869870
EncodesAsText: true,
870-
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{}),
871+
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}),
871872
PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}),
872873
StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
873-
Strict: true,
874+
Strict: true,
875+
StreamingCollectionsEncoding: streamingCollections,
874876
}),
875877
StreamSerializer: &runtime.StreamSerializerInfo{
876878
EncodesAsText: true,
@@ -893,7 +895,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
893895
MediaType: "application/vnd.kubernetes.protobuf",
894896
MediaTypeType: "application",
895897
MediaTypeSubType: "vnd.kubernetes.protobuf",
896-
Serializer: protobuf.NewSerializer(creator, typer),
898+
Serializer: protobuf.NewSerializerWithOptions(creator, typer, protobuf.SerializerOptions{
899+
StreamingCollectionsEncoding: utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf),
900+
}),
897901
StreamSerializer: &runtime.StreamSerializerInfo{
898902
Serializer: protobuf.NewRawSerializer(creator, typer),
899903
Framer: protobuf.LengthDelimitedFramer,
@@ -973,6 +977,12 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
973977
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
974978
opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
975979
}
980+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
981+
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
982+
}
983+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
984+
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
985+
}
976986
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
977987
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
978988
scaleScope.Namer = handlers.ContextBasedNaming{

staging/src/k8s.io/apimachinery/pkg/api/meta/help.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) {
221221
if err != nil {
222222
return nil, err
223223
}
224+
if items.IsNil() {
225+
return nil, nil
226+
}
224227
list := make([]runtime.Object, items.Len())
225228
if len(list) == 0 {
226229
return list, nil

staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []runtime.SerializerInfo {
2929
jsonSerializer := json.NewSerializerWithOptions(
3030
mf, scheme, scheme,
31-
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict},
31+
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
3232
)
3333
jsonSerializerType := runtime.SerializerInfo{
3434
MediaType: runtime.ContentTypeJSON,
@@ -38,7 +38,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
3838
Serializer: jsonSerializer,
3939
StrictSerializer: json.NewSerializerWithOptions(
4040
mf, scheme, scheme,
41-
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true},
41+
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
4242
),
4343
StreamSerializer: &runtime.StreamSerializerInfo{
4444
EncodesAsText: true,
@@ -61,7 +61,9 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
6161
mf, scheme, scheme,
6262
json.SerializerOptions{Yaml: true, Pretty: false, Strict: true},
6363
)
64-
protoSerializer := protobuf.NewSerializer(scheme, scheme)
64+
protoSerializer := protobuf.NewSerializerWithOptions(scheme, scheme, protobuf.SerializerOptions{
65+
StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToProtobuf,
66+
})
6567
protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme)
6668

6769
serializers := []runtime.SerializerInfo{
@@ -113,6 +115,9 @@ type CodecFactoryOptions struct {
113115
// Pretty includes a pretty serializer along with the non-pretty one
114116
Pretty bool
115117

118+
StreamingCollectionsEncodingToJSON bool
119+
StreamingCollectionsEncodingToProtobuf bool
120+
116121
serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo
117122
}
118123

@@ -147,6 +152,18 @@ func WithSerializer(f func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.S
147152
}
148153
}
149154

155+
func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator {
156+
return func(options *CodecFactoryOptions) {
157+
options.StreamingCollectionsEncodingToJSON = true
158+
}
159+
}
160+
161+
func WithStreamingCollectionEncodingToProtobuf() CodecFactoryOptionsMutator {
162+
return func(options *CodecFactoryOptions) {
163+
options.StreamingCollectionsEncodingToProtobuf = true
164+
}
165+
}
166+
150167
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
151168
// and conversion wrappers to define preferred internal and external versions. In the future,
152169
// as the internal version is used less, callers may instead use a defaulting serializer and
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package json
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"io"
23+
"maps"
24+
"slices"
25+
"sort"
26+
27+
"k8s.io/apimachinery/pkg/api/meta"
28+
"k8s.io/apimachinery/pkg/conversion"
29+
30+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
32+
"k8s.io/apimachinery/pkg/runtime"
33+
)
34+
35+
func streamEncodeCollections(obj runtime.Object, w io.Writer) (bool, error) {
36+
list, ok := obj.(*unstructured.UnstructuredList)
37+
if ok {
38+
return true, streamingEncodeUnstructuredList(w, list)
39+
}
40+
if _, ok := obj.(json.Marshaler); ok {
41+
return false, nil
42+
}
43+
typeMeta, listMeta, items, err := getListMeta(obj)
44+
if err == nil {
45+
return true, streamingEncodeList(w, typeMeta, listMeta, items)
46+
}
47+
return false, nil
48+
}
49+
50+
// getListMeta implements list extraction logic for json stream serialization.
51+
//
52+
// Reason for a custom logic instead of reusing accessors from meta package:
53+
// * Validate json tags to prevent incompatibility with json standard package.
54+
// * ListMetaAccessor doesn't distinguish empty from nil value.
55+
// * TypeAccessort reparsing "apiVersion" and serializing it with "{group}/{version}"
56+
func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runtime.Object, error) {
57+
listValue, err := conversion.EnforcePtr(list)
58+
if err != nil {
59+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
60+
}
61+
listType := listValue.Type()
62+
if listType.NumField() != 3 {
63+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListType to have 3 fields")
64+
}
65+
// TypeMeta
66+
typeMeta, ok := listValue.Field(0).Interface().(metav1.TypeMeta)
67+
if !ok {
68+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type")
69+
}
70+
if listType.Field(0).Tag.Get("json") != ",inline" {
71+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be ",inline"`)
72+
}
73+
// ListMeta
74+
listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta)
75+
if !ok {
76+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListMeta field to have ListMeta type")
77+
}
78+
if listType.Field(1).Tag.Get("json") != "metadata,omitempty" {
79+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected ListMeta json field tag to be "metadata,omitempty"`)
80+
}
81+
// Items
82+
items, err := meta.ExtractList(list)
83+
if err != nil {
84+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
85+
}
86+
if listType.Field(2).Tag.Get("json") != "items" {
87+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected Items json field tag to be "items"`)
88+
}
89+
return typeMeta, listMeta, items, nil
90+
}
91+
92+
func streamingEncodeList(w io.Writer, typeMeta metav1.TypeMeta, listMeta metav1.ListMeta, items []runtime.Object) error {
93+
// Start
94+
if _, err := w.Write([]byte(`{`)); err != nil {
95+
return err
96+
}
97+
98+
// TypeMeta
99+
if typeMeta.Kind != "" {
100+
if err := encodeKeyValuePair(w, "kind", typeMeta.Kind, []byte(",")); err != nil {
101+
return err
102+
}
103+
}
104+
if typeMeta.APIVersion != "" {
105+
if err := encodeKeyValuePair(w, "apiVersion", typeMeta.APIVersion, []byte(",")); err != nil {
106+
return err
107+
}
108+
}
109+
110+
// ListMeta
111+
if err := encodeKeyValuePair(w, "metadata", listMeta, []byte(",")); err != nil {
112+
return err
113+
}
114+
115+
// Items
116+
if err := encodeItemsObjectSlice(w, items); err != nil {
117+
return err
118+
}
119+
120+
// End
121+
_, err := w.Write([]byte("}\n"))
122+
return err
123+
}
124+
125+
func encodeItemsObjectSlice(w io.Writer, items []runtime.Object) (err error) {
126+
if items == nil {
127+
err := encodeKeyValuePair(w, "items", nil, nil)
128+
return err
129+
}
130+
_, err = w.Write([]byte(`"items":[`))
131+
if err != nil {
132+
return err
133+
}
134+
suffix := []byte(",")
135+
for i, item := range items {
136+
if i == len(items)-1 {
137+
suffix = nil
138+
}
139+
err := encodeValue(w, item, suffix)
140+
if err != nil {
141+
return err
142+
}
143+
}
144+
_, err = w.Write([]byte("]"))
145+
if err != nil {
146+
return err
147+
}
148+
return err
149+
}
150+
151+
func streamingEncodeUnstructuredList(w io.Writer, list *unstructured.UnstructuredList) error {
152+
_, err := w.Write([]byte(`{`))
153+
if err != nil {
154+
return err
155+
}
156+
keys := slices.Collect(maps.Keys(list.Object))
157+
if _, exists := list.Object["items"]; !exists {
158+
keys = append(keys, "items")
159+
}
160+
sort.Strings(keys)
161+
162+
suffix := []byte(",")
163+
for i, key := range keys {
164+
if i == len(keys)-1 {
165+
suffix = nil
166+
}
167+
if key == "items" {
168+
err = encodeItemsUnstructuredSlice(w, list.Items, suffix)
169+
} else {
170+
err = encodeKeyValuePair(w, key, list.Object[key], suffix)
171+
}
172+
if err != nil {
173+
return err
174+
}
175+
}
176+
_, err = w.Write([]byte("}\n"))
177+
return err
178+
}
179+
180+
func encodeItemsUnstructuredSlice(w io.Writer, items []unstructured.Unstructured, suffix []byte) (err error) {
181+
_, err = w.Write([]byte(`"items":[`))
182+
if err != nil {
183+
return err
184+
}
185+
comma := []byte(",")
186+
for i, item := range items {
187+
if i == len(items)-1 {
188+
comma = nil
189+
}
190+
err := encodeValue(w, item.Object, comma)
191+
if err != nil {
192+
return err
193+
}
194+
}
195+
_, err = w.Write([]byte("]"))
196+
if err != nil {
197+
return err
198+
}
199+
if len(suffix) > 0 {
200+
_, err = w.Write(suffix)
201+
}
202+
return err
203+
}
204+
205+
func encodeKeyValuePair(w io.Writer, key string, value any, suffix []byte) (err error) {
206+
err = encodeValue(w, key, []byte(":"))
207+
if err != nil {
208+
return err
209+
}
210+
err = encodeValue(w, value, suffix)
211+
if err != nil {
212+
return err
213+
}
214+
return err
215+
}
216+
217+
func encodeValue(w io.Writer, value any, suffix []byte) error {
218+
data, err := json.Marshal(value)
219+
if err != nil {
220+
return err
221+
}
222+
_, err = w.Write(data)
223+
if err != nil {
224+
return err
225+
}
226+
if len(suffix) > 0 {
227+
_, err = w.Write(suffix)
228+
}
229+
return err
230+
}

0 commit comments

Comments
 (0)