8000 feat: add MQTT transport messaging (#459) · jasonjlock/sdk-javascript@591d133 · GitHub
[go: up one dir, main page]

Skip to content

Commit 591d133

Browse files
authored
feat: add MQTT transport messaging (cloudevents#459)
Add MQTT as a `Message` format. This commit adds `MQTT` to the supported transport protocols by adding a `Binding` and the `MQTTMessage<T>` type, extending the base `Message` type, adding the MQTT fields for `payload`, `PUBLISH` and `User Properties`. The `payload` field directly maps to `Message#body`, while `User Properties` roughly maps to `Message#headers`, even though the properties here are not formatted with a `ce-` prefix like other transport protocols. This is per the spec. See: https://github.com/cloudevents/spec/blob/v1.0.1/mqtt-protocol-binding.md. Signed-off-by: Lance Ball <lball@redhat.com>
1 parent 5d1f744 commit 591d133

File tree

5 files changed

+466
-2
lines changed

5 files changed

+466
-2
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ There you will find Express.js, TypeScript and Websocket examples.
163163
| AMQP Protocol Binding | :x: | :x: |
164164
| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
165165
| Kafka Protocol Binding | :x: | :heavy_check_mark: |
166-
| MQTT Protocol Binding | :x: | :x: |
166+
| MQTT Protocol Binding | :heavy_check_mark: < 8000 span class="pl-ml">| :x: |
167167
| NATS Protocol Binding | :x: | :x: |
168168

169169
---
@@ -176,6 +176,9 @@ There you will find Express.js, TypeScript and Websocket examples.
176176
| Kafka Binary | :heavy_check_mark: | :heavy_check_mark: |
177177
| Kafka Structured | :heavy_check_mark: | :heavy_check_mark: |
178178
| Kafka Batch | :heavy_check_mark: | :heavy_check_mark:
179+
| MQTT Binary | :heavy_check_mark: | :heavy_check_mark: |
180+
| MQTT Structured | :heavy_check_mark: | :heavy_check_mark: |
181+
179182
## Community
180183

181184
- There are bi-weekly calls immediately following the [Serverless/CloudEvents

src/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";
99

1010
import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter";
1111
import {
12-
Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message,
12+
Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message, MQTT, MQTTMessage, MQTTMessageFactory,
1313
Serializer, Deserializer } from "./message";
1414

1515
import CONSTANTS from "./constants";
@@ -32,6 +32,9 @@ export {
3232
Kafka,
3333
KafkaEvent,
3434
KafkaMessage,
35+
MQTT,
36+
MQTTMessage,
37+
MQTTMessageFactory,
3538
// From transport
3639
TransportFunction,
3740
EmitterFunction,

src/message/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { CloudEventV1 } from "..";
99
// reexport the protocol bindings
1010
export * from "./http";
1111
export * from "./kafka";
12+
export * from "./mqtt";
1213

1314
/**
1415
* Binding is an interface for transport protocols to implement,

src/message/mqtt/index.ts

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
Copyright 2021 The CloudEvents Authors
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
import { Binding, Deserializer, CloudEvent, CloudEventV1, CONSTANTS, Message, ValidationError, Headers } from "../..";
7+
8+
export {
9+
MQTT,
10+
MQTTMessage,
11+
MQTTMessageFactory
12+
};
13+
14+
/**
15+
* Extends the base {@linkcode Message} interface to include MQTT attributes, some of which
16+
* are aliases of the {Message} attributes.
17+
*/
18+
interface MQTTMessage<T> extends Message<T> {
19+
/**
20+
* Identifies this message as a PUBLISH packet. MQTTMessages created with
21+
* the `binary` and `structured` Serializers will contain a "Content Type"
22+
* property in the PUBLISH record.
23+
* @see https://github.com/cloudevents/spec/blob/v1.0.1/mqtt-protocol-binding.md#3-mqtt-publish-message-mapping
24+
*/
25+
PUBLISH: Record<string, string | undefined> | undefined
26+
/**
27+
* Alias of {Message#body}
28+
*/
29+
payload: T | undefined,
30+
/**
31+
* Alias of {Message#headers}
32+
*/
33+
"User Properties": Headers | undefined
34+
}
35+
36+
/**
37+
* Binding for MQTT transport support
38+
* @implements @linkcode Binding
39+
*/
40+
const MQTT: Binding = {
41+
binary,
42+
structured,
43+
toEvent: toEvent as Deserializer,
44+
isEvent
45+
};
46+
47+
/**
48+
* Converts a CloudEvent into an MQTTMessage<T> with the event's data as the message payload
49+
* @param {CloudEventV1} event a CloudEvent
50+
* @returns {MQTTMessage<T>} the event serialized as an MQTTMessage<T> with binary encoding
51+
* @implements {Serializer}
52+
*/
53+
function binary<T>(event: CloudEventV1<T>): MQTTMessage<T> {
54+
let properties;
55+
if (event instanceof CloudEvent) {
56+
properties = event.toJSON();
57+
} else {
58+
properties = event;
59+
}
60+
const body = properties.data as T;
61+
delete properties.data;
62+
63+
return MQTTMessageFactory(event.datacontenttype as string, properties, body);
64+
}
65+
66+
/**
67+
* Converts a CloudEvent into an MQTTMessage<T> with the event as the message payload
68+
* @param {CloudEventV1} event a CloudEvent
69+
* @returns {MQTTMessage<T>} the event serialized as an MQTTMessage<T> with structured encoding
70+
* @implements {Serializer}
71+
*/
72+
function structured<T>(event: CloudEventV1<T>): MQTTMessage<T> {
73+
let body;
74+
if (event instanceof CloudEvent) {
75+
body = event.toJSON();
76+
} else {
77+
body = event;
78+
}
79+
return MQTTMessageFactory(CONSTANTS.DEFAULT_CE_CONTENT_TYPE, {}, body) as MQTTMessage<T>;
80+
}
81+
82+
/**
83+
* A helper function to create an MQTTMessage<T> object, with "User Properties" as an alias
84+
* for "headers" and "payload" an alias for body, and a "PUBLISH" record with a "Content Type"
85+
* property.
86+
* @param {string} contentType the "Content Type" attribute on PUBLISH
87+
* @param {Record<string, unknown>} headers the headers and "User Properties"
88+
* @param {T} body the message body/payload
89+
* @returns {MQTTMessage<T>} a message initialized with the provided attributes
90+
*/
91+
function MQTTMessageFactory<T>(contentType: string, headers: Record<string, unknown>, body: T): MQTTMessage<T> {
92+
return {
93+
PUBLISH: {
94+
"Content Type": contentType
95+
},
96+
body,
97+
get payload() {
98+
return this.body as T;
99+
},
100+
headers: headers as Headers,
101+
get "User Properties"() {
102+
return this.headers as any;
103+
}
104+
};
105+
}
106+
107+
/**
108+
* Converts an MQTTMessage<T> into a CloudEvent
109+
* @param {Message<T>} message the message to deserialize
110+
* @param {boolean} strict determines if a ValidationError will be thrown on bad input - defaults to false
111+
* @returns {CloudEventV1<T>} an event
112+
* @implements {Deserializer}
113+
*/
114+
function toEvent<T>(message: Message<T>, strict = false): CloudEventV1<T> | CloudEventV1<T>[] {
115+
if (strict && !isEvent(message)) {
116+
throw new ValidationError("No CloudEvent detected");
117+
}
118+
if (isStructuredMessage(message as MQTTMessage<T>)) {
119+
const evt = (typeof message.body === "string") ? JSON.parse(message.body): message.body;
120+
return new CloudEvent({
121+
...evt as CloudEventV1<T>
122+
}, false);
123+
} else {
124+
return new CloudEvent<T>({
125+
...message.headers,
126+
data: message.body as T,
127+
}, false);
128+
}
129+
}
130+
131+
/**
132+
* Determine if the message is a CloudEvent
133+
* @param {Message<T>} message an MQTTMessage
134+
* @returns {boolean} true if the message contains an event
135+
*/
136+
function isEvent<T>(message: Message<T>): boolean {
137+
return isBinaryMessage(message) || isStructuredMessage(message as MQTTMessage<T>);
138+
}
139+
140+
function isBinaryMessage<T>(message: Message<T>): boolean {
141+
return (!!message.headers.id && !!message.headers.source
142+
&& !! message.headers.type && !!message.headers.specversion);
143+
}
144+
145+
function isStructuredMessage<T>(message: MQTTMessage<T>): boolean {
146+
if (!message) { return false; }
147+
return (message.PUBLISH && message?.PUBLISH["Content Type"]?.startsWith(CONSTANTS.MIME_CE_JSON)) || false;
148+
}

0 commit comments

Comments
 (0)
0