-
Notifications
You must be signed in to change notification settings - Fork 99
MINIFICPP-2587 Add Sparkplug B support for ConsumeMQTT processor #1990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: MINIFICPP-2603
Are you sure you want to change the base?
Conversation
d3a2f4f to
4f99c2a
Compare
4f99c2a to
fcce3c4
Compare
CONTROLLERS.md
Outdated
|
|
||
| ### Description | ||
|
|
||
| Reads Sparkplug B messages and turns them into individual Record objects. The reader expects a single Sparkplug B payload in a read operation, which is a protobuf-encoded binary message. This reader is typically used with MQTT processors like ConsumeMQTT. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Reads Sparkplug B messages and turns them into individual Record objects. The reader expects a single Sparkplug B payload in a read operation, which is a protobuf-encoded binary message. This reader is typically used with MQTT processors like ConsumeMQTT. | |
| Reads Sparkplug B messages and turns them into individual Record objects. The reader expects a single Sparkplug B payload in a read operation, which is a protobuf-encoded binary message. This reader is designed to be used with ConsumeMQTT, since Sparkplug B is an MQTT-based protocol. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 7e2cad5
37a1f4f to
cacb29e
Compare
66efb04 to
c151058
Compare
| | mqtt.broker | | URI of the sending broker | | ||
| | mqtt.topic | | Topic of the message | | ||
| | mqtt.topic.segment.n | | The nth topic segment of the message | | ||
| | mqtt.qos | | The quality of service for this message. | | ||
| | mqtt.isDuplicate | | Whether or not this message might be a duplicate of one which has already been received. | | ||
| | mqtt.isRetained | | Whether or not this message was from a current publisher, or was "retained" by the server as the last message published on the topic. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm only seeing mqtt.broker in the flow file attributes with the record reader/writer set, but I'm seeing similar record attributes with an underscore prefix. We should make it clear which attribute ends up where.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added info on this in the description in 7e2cad5
| template<typename RecordSetIO> | ||
| std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context, const core::PropertyReference& property, | ||
| const utils::Identifier& processor_uuid) { | ||
| std::string service_name = context.getProperty(property).value_or(""); | ||
| if (!IsNullOrEmpty(service_name)) { | ||
| auto record_set_io = std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name, processor_uuid)); | ||
| if (!record_set_io) { | ||
| throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, fmt::format("'{}' property is set to invalid controller service '{}'", property.name, service_name)); | ||
| } | ||
| return record_set_io; | ||
| } | ||
| return nullptr; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could use utils::parseOptionalControllerService() instead of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My problem with that currently that we do not differentiate if the property was set, but it was invalid or if the property was not set at all. Maybe we should change the current utility to only return std::nullopt if the input is invalid, otherwise return nullptr, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Another option would be to throw on invalid and return either nullopt or nullptr if the property is not set.
EDIT: we agreed that I will change utils::parseOptionalControll
8000
erService() to throw (and also to not use optional) in a separate PR. For now, this can stay as it is, and we will rebase one or the other PR depending on which gets merged first.
cacb29e to
05314ac
Compare
7e2cad5 to
e216a53
Compare
7b4a571 to
1a35ef8
Compare
e216a53 to
153ef73
Compare
153ef73 to
7dd08d4
Compare
7dd08d4 to
8d75557
Compare
https://issues.apache.org/jira/browse/MINIFICPP-2587
Depends on #2000
Depends on #2004
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically main)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.