|
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | | -import functools |
| 15 | +import enum |
16 | 16 | import importlib.util |
| 17 | +import io |
| 18 | +import json |
17 | 19 | import os.path |
18 | 20 | import pathlib |
19 | 21 | import sys |
20 | 22 | import types |
21 | 23 |
|
| 24 | +import cloudevents.sdk |
| 25 | +import cloudevents.sdk.event |
| 26 | +import cloudevents.sdk.event.v1 |
| 27 | +import cloudevents.sdk.marshaller |
22 | 28 | import flask |
23 | 29 | import werkzeug |
24 | 30 |
|
|
35 | 41 | DEFAULT_SIGNATURE_TYPE = "http" |
36 | 42 |
|
37 | 43 |
|
| 44 | +class _EventType(enum.Enum): |
| 45 | + LEGACY = 1 |
| 46 | + CLOUDEVENT_BINARY = 2 |
| 47 | + CLOUDEVENT_STRUCTURED = 3 |
| 48 | + |
| 49 | + |
38 | 50 | class _Event(object): |
39 | 51 | """Event passed to background functions.""" |
40 | 52 |
|
@@ -67,38 +79,83 @@ def view_func(path): |
67 | 79 | return view_func |
68 | 80 |
|
69 | 81 |
|
70 | | -def _is_binary_cloud_event(request): |
71 | | - return ( |
| 82 | +def _get_cloudevent_version(): |
| 83 | + return cloudevents.sdk.event.v1.Event() |
| 84 | + |
| 85 | + |
| 86 | +def _run_legacy_event(function, request): |
| 87 | + event_data = request.get_json() |
| 88 | + if not event_data: |
| 89 | + flask.abort(400) |
| 90 | + event_object = _Event(**event_data) |
| 91 | + data = event_object.data |
| 92 | + context = Context(**event_object.context) |
| 93 | + function(data, context) |
| 94 | + |
| 95 | + |
| 96 | +def _run_binary_cloudevent(function, request, cloudevent_def): |
| 97 | + data = io.BytesIO(request.get_data()) |
| 98 | + http_marshaller = cloudevents.sdk.marshaller.NewDefaultHTTPMarshaller() |
| 99 | + event = http_marshaller.FromRequest( |
| 100 | + cloudevent_def, request.headers, data, json.load |
| 101 | + ) |
| 102 | + |
| 103 | + function(event) |
| 104 | + |
| 105 | + |
| 106 | +def _run_structured_cloudevent(function, request, cloudevent_def): |
| 107 | + data = io.StringIO(request.get_data(as_text=True)) |
| 108 | + m = cloudevents.sdk.marshaller.NewDefaultHTTPMarshaller() |
| 109 | + event = m.FromRequest(cloudevent_def, request.headers, data, json.loads) |
| 110 | + function(event) |
| 111 | + |
| 112 | + |
| 113 | +def _get_event_type(request): |
| 114 | + if ( |
72 | 115 | request.headers.get("ce-type") |
73 | 116 | and request.headers.get("ce-specversion") |
74 | 117 | and request.headers.get("ce-source") |
75 | 118 | and request.headers.get("ce-id") |
76 | | - ) |
| 119 | + ): |
| 120 | + return _EventType.CLOUDEVENT_BINARY |
| 121 | + elif request.headers.get("Content-Type") == "application/cloudevents+json": |
| 122 | + return _EventType.CLOUDEVENT_STRUCTURED |
| 123 | + else: |
| 124 | + return _EventType.LEGACY |
77 | 125 |
|
78 | 126 |
|
79 | 127 | def _event_view_func_wrapper(function, request): |
80 | 128 | def view_func(path): |
81 | | - if _is_binary_cloud_event(request): |
82 | | - # Support CloudEvents in binary content mode, with data being the |
83 | | - # whole request body and context attributes retrieved from request |
84 | | - # headers. |
85 | | - data = request.get_data() |
86 | | - context = Context( |
87 | | - eventId=request.headers.get("ce-eventId"), |
88 | | - timestamp=request.headers.get("ce-timestamp"), |
89 | | - eventType=request.headers.get("ce-eventType"), |
90 | | - resource=request.headers.get("ce-resource"), |
| 129 | + if _get_event_type(request) == _EventType.LEGACY: |
| 130 | + _run_legacy_event(function, request) |
| 131 | + else: |
| 132 | + # here for defensive backwards compatibility in case we make a mistake in rollout. |
| 133 | + flask.abort( |
| 134 | + 400, |
| 135 | + description="The FUNCTION_SIGNATURE_TYPE for this function is set to event " |
| 136 | + "but no Google Cloud Functions Event was given. If you are using CloudEvents set " |
| 137 | + "FUNCTION_SIGNATURE_TYPE=cloudevent", |
91 | 138 | ) |
92 | | - function(data, context) |
| 139 | + |
| 140 | + return "OK" |
| 141 | + |
| 142 | + return view_func |
| 143 | + |
| 144 | + |
| 145 | +def _cloudevent_view_func_wrapper(function, request): |
| 146 | + def view_func(path): |
| 147 | + cloudevent_def = _get_cloudevent_version() |
| 148 | + event_type = _get_event_type(request) |
| 149 | + if event_type == _EventType.CLOUDEVENT_STRUCTURED: |
| 150 | + _run_structured_cloudevent(function, request, cloudevent_def) |
| 151 | + elif event_type == _EventType.CLOUDEVENT_BINARY: |
| 152 | + _run_binary_cloudevent(function, request, cloudevent_def) |
93 | 153 | else: |
94 | | - # This is a regular CloudEvent |
95 | | - event_data = request.get_json() |
96 | | - if not event_data: |
97 | | - flask.abort(400) |
98 | | - event_object = _Event(**event_data) |
99 | | - data = event_object.data |
100 | | - context = Context(**event_object.context) |
101 | | - function(data, context) |
| 154 | + flask.abort( |
| 155 | + 400, |
| 156 | + description="Function was defined with FUNCTION_SIGNATURE_TYPE=cloudevent " |
| 157 | + " but it did not receive a cloudevent as a request.", |
| 158 | + ) |
102 | 159 |
|
103 | 160 | return "OK" |
104 | 161 |
|
@@ -193,19 +250,27 @@ def create_app(target=None, source=None, signature_type=None): |
193 | 250 | app.url_map.add(werkzeug.routing.Rule("/<path:path>", endpoint="run")) |
194 | 251 | app.view_functions["run"] = _http_view_func_wrapper(function, flask.request) |
195 | 252 | app.view_functions["error"] = lambda: flask.abort(404, description="Not Found") |
196 | | - elif signature_type == "event": |
| 253 | + elif signature_type == "event" or signature_type == "cloudevent": |
197 | 254 | app.url_map.add( |
198 | 255 | werkzeug.routing.Rule( |
199 | | - "/", defaults={"path": ""}, endpoint="run", methods=["POST"] |
| 256 | + "/", defaults={"path": ""}, endpoint=signature_type, methods=["POST"] |
200 | 257 | ) |
201 | 258 | ) |
202 | 259 | app.url_map.add( |
203 | | - werkzeug.routing.Rule("/<path:path>", endpoint="run", methods=["POST"]) |
| 260 | + werkzeug.routing.Rule( |
| 261 | + "/<path:path>", endpoint=signature_type, methods=["POST"] |
| 262 | + ) |
204 | 263 | ) |
205 | | - app.view_functions["run"] = _event_view_func_wrapper(function, flask.request) |
| 264 | + |
206 | 265 | # Add a dummy endpoint for GET / |
207 | 266 | app.url_map.add(werkzeug.routing.Rule("/", endpoint="get", methods=["GET"])) |
208 | 267 | app.view_functions["get"] = lambda: "" |
| 268 | + |
| 269 | + # Add the view functions |
| 270 | + app.view_functions["event"] = _event_view_func_wrapper(function, flask.request) |
| 271 | + app.view_functions["cloudevent"] = _cloudevent_view_func_wrapper( |
| 272 | + function, flask.request |
| 273 | + ) |
209 | 274 | else: |
210 | 275 | raise FunctionsFrameworkException( |
211 | 276 | "Invalid signature type: {signature_type}".format( |
|
0 commit comments