1
- import { CloudEvent , CloudEventV03 , CloudEventV1 , CONSTANTS , Version } from "../.." ;
2
- import { Message , Sender , Headers } from ".." ;
1
+ import { CloudEvent , CloudEventV03 , CloudEventV1 , CONSTANTS , Mode , Version } from "../.." ;
2
+ import { Message , Headers } from ".." ;
3
3
4
- import { headersFor , v1binaryParsers , validate } from "./headers" ;
4
+ import { headersFor , sanitize , v03structuredParsers , v1binaryParsers , v1structuredParsers , validate } from "./headers" ;
5
5
import { asData , isBase64 , isString , isStringOrObjectOrThrow , ValidationError } from "../../event/validation" ;
6
6
import { validateCloudEvent } from "../../event/spec" ;
7
- import { MappedParser , parserByContentType } from "../../parsers" ;
8
-
9
- // Sender is a function that takes headers and body for transmission
<
F438
code>10
- // over HTTP. Users supply this function as a parameter to HTTP.emit()
11
- // Sends a message by invoking sender(). Implements Invoker
12
- export function invoke ( sender : Sender , message : Message ) : Promise < boolean > {
13
- return sender ( message . headers , message . body ) ;
14
- }
7
+ import { Base64Parser , JSONParser , MappedParser , Parser , parserByContentType } from "../../parsers" ;
15
8
16
9
// implements Serializer
17
10
export function binary ( event : CloudEvent ) : Message {
@@ -33,6 +26,69 @@ export function structured(event: CloudEvent): Message {
33
26
} ;
34
27
}
35
28
29
+ /**
30
+ * Converts a Message to a CloudEvent
31
+ *
32
+ * @param {Message } message the incoming message
33
+ * @return {CloudEvent } A new {CloudEvent} instance
34
+ */
35
+ export function deserialize ( message : Message ) : CloudEvent {
36
+ const cleanHeaders : Headers = sanitize ( message . headers ) ;
37
+ const mode : Mode = getMode ( cleanHeaders ) ;
38
+ let version = getVersion ( mode , cleanHeaders , message . body ) ;
39
+ if ( version !== Version . V03 && version !== Version . V1 ) {
40
+ console . error ( `Unknown spec version ${ version } . Default to ${ Version . V1 } ` ) ;
41
+ version = Version . V1 ;
42
+ }
43
+ switch ( mode ) {
44
+ case Mode . BINARY :
45
+ return parseBinary ( message , version ) ;
46
+ case Mode . STRUCTURED :
47
+ return parseStructured ( message , version ) ;
48
+ default :
49
+ throw new ValidationError ( "Unknown Message mode" ) ;
50
+ }
51
+ }
52
+
53
+ /**
54
+ * Determines the HTTP transport mode (binary or structured) based
55
+ * on the incoming HTTP headers.
56
+ * @param {Headers } headers the incoming HTTP headers
57
+ * @returns {Mode } the transport mode
58
+ */
59
+ function getMode ( headers : Headers ) : Mode {
60
+ const contentType = headers [ CONSTANTS . HEADER_CONTENT_TYPE ] ;
61
+ if ( contentType && contentType . startsWith ( CONSTANTS . MIME_CE ) ) {
62
+ return Mode . STRUCTURED ;
63
+ }
64
+ if ( headers [ CONSTANTS . CE_HEADERS . ID ] ) {
65
+ return Mode . BINARY ;
66
+ }
67
+ throw new ValidationError ( "no cloud event detected" ) ;
68
+ }
69
+
70
+ /**
71
+ * Determines the version of an incoming CloudEvent based on the
72
+ * HTTP headers or HTTP body, depending on transport mode.
73
+ * @param {Mode } mode the HTTP transport mode
74
+ * @param {Headers } headers the incoming HTTP headers
75
+ * @param {Record<string, unknown> } body the HTTP request body
76
+ * @returns {Version } the CloudEvent specification version
77
+ */
78
+ function getVersion ( mode : Mode , headers : Headers , body : string | Record < string , string > ) {
79
+ if ( mode === Mode . BINARY ) {
80
+ // Check the headers for the version
81
+ const versionHeader = headers [ CONSTANTS . CE_HEADERS . SPEC_VERSION ] ;
82
+ if ( versionHeader ) {
83
+ return versionHeader ;
84
+ }
85
+ } else {
86
+ // structured mode - the version is in the body
87
+ return typeof body === "string" ? JSON . parse ( body ) . specversion : ( body as CloudEvent ) . specversion ;
88
+ }
89
+ return Version . V1 ;
90
+ }
91
+
36
92
/**
37
93
* Parses an incoming HTTP Message, converting it to a {CloudEvent}
38
94
* instance if it conforms to the Cloud Event specification for this receiver.
@@ -42,7 +98,7 @@ export function structured(event: CloudEvent): Message {
42
98
* @returns {CloudEvent } an instance of CloudEvent representing the incoming request
43
99
* @throws {ValidationError } of the event does not conform to the spec
44
100
*/
45
- export function receive ( message : Message , version : Version = Version . V1 ) : CloudEvent {
101 + function parseBinary ( message : Message , version : Version ) : CloudEvent {
46
102
const headers = message . headers ;
47
103
let body = message . body ;
48
104
@@ -102,3 +158,72 @@ export function receive(message: Message, version: Version = Version.V1): CloudE
102
158
validateCloudEvent ( cloudevent ) ;
103
159
return cloudevent ;
104
160
}
161
+
162
+ /**
163
+ * Creates a new CloudEvent instance based on the provided payload and headers.
164
+ *
165
+ * @param {Message } message the incoming Message
166
+ * @param {Version } version the spec version of this message (v1 or v03)
167
+ * @returns {CloudEvent } a new CloudEvent instance for the provided headers and payload
168
+ * @throws {ValidationError } if the payload and header combination do not conform to the spec
169
+ */
170
+ function parseStructured ( message : Message , version : Version ) : CloudEvent {
171
+ let payload = message . body ;
172
+ const headers = message . headers ;
173
+
174
+ if ( ! payload ) throw new ValidationError ( "payload is null or undefined" ) ;
175
+ if ( ! headers ) throw new ValidationError ( "headers is null or undefined" ) ;
176
+ isStringOrObjectOrThrow ( payload , new ValidationError ( "payload must be an object or a string" ) ) ;
177
+
178
+ if (
179
+ headers [ CONSTANTS . CE_HEADERS . SPEC_VERSION ] &&
180
+ headers [ CONSTANTS . CE_HEADERS . SPEC_VERSION ] != Version . V03 &&
181
+ headers [ CONSTANTS . CE_HEADERS . SPEC_VERSION ] != Version . V1
182
+ ) {
183
+ throw new ValidationError ( `invalid spec version ${ headers [ CONSTANTS . CE_HEADERS . SPEC_VERSION ] } ` ) ;
184
+ }
185
+
186
+ payload = isString ( payload ) && isBase64 ( payload ) ? Buffer . from ( payload as string , "base64" ) . toString ( ) : payload ;
187
+
188
+ // Clone and low case all headers names
189
+ const sanitizedHeaders = sanitize ( headers ) ;
190
+
191
+ const contentType = sanitizedHeaders [ CONSTANTS . HEADER_CONTENT_TYPE ] ;
192
+ const parser : Parser = contentType ? parserByContentType [ contentType ] : new JSONParser ( ) ;
193
+ if ( ! parser ) throw new ValidationError ( `invalid content type ${ sanitizedHeaders [ CONSTANTS . HEADER_CONTENT_TYPE ] } ` ) ;
194
+ const incoming = { ...( parser . parse ( payload ) as Record < string , unknown > ) } ;
195
+
196
+ const eventObj : { [ key : string ] : unknown } = { } ;
197
+ const parserMap : Record < string , MappedParser > = version === Version . V1 ? v1structuredParsers : v03structuredParsers ;
198
+
199
+ for ( const key in parserMap ) {
200
+ const property = incoming [ key ] ;
201
+ if ( property ) {
202
+ const parser : MappedParser = parserMap [ key ] ;
203
+ eventObj [ parser . name ] = parser . parser . parse ( property as string ) ;
204
+ }
205
+ delete incoming [ key ] ;
206
+ }
207
+
208
+ // extensions are what we have left after processing all other properties
209
+ for ( const key in incoming ) {
210
+ eventObj [ key ] = incoming [ key ] ;
211
+ }
212
+
213
+ // ensure data content is correctly encoded
214
+ if ( eventObj . data && eventObj . datacontentencoding ) {
215
+ if ( eventObj . datacontentencoding === CONSTANTS . ENCODING_BASE64 && ! isBase64 ( eventObj . data ) ) {
216
+ throw new ValidationError ( "invalid payload" ) ;
217
+ } else if ( eventObj . datacontentencoding === CONSTANTS . ENCODING_BASE64 ) {
218
+ const dataParser = new Base64Parser ( ) ;
219
+ eventObj . data = JSON . parse ( dataParser . parse ( eventObj . data as string ) ) ;
220
+ delete eventObj . datacontentencoding ;
221
+ }
222
+ }
223
+
224
+ const cloudevent = new CloudEvent ( eventObj as CloudEventV1 | CloudEventV03 ) ;
225
+
226
+ // Validates the event
227
+ validateCloudEvent ( cloudevent ) ;
228
+ return cloudevent ;
229
+ }
0 commit comments