10000 MINIFICPP-2587 Add Sparkplug B support for ConsumeMQTT processor by lordgamez · Pull Request #1990 · apache/nifi-minifi-cpp · GitHub
[go: up one dir, main page]

Skip to content

Conversation

@lordgamez
Copy link
Contributor
@lordgamez lordgamez commented Jul 11, 2025

https://issues.apache.org/jira/browse/MINIFICPP-2587

Depends on #2000
Depends on #2004


Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically main)?

  • Is your initial contribution a single, squashed commit?

For code changes:

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file?
  • If applicable, have you updated the NOTICE file?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.

@lordgamez lordgamez marked this pull request as ready for review July 11, 2025 15:11
@lordgamez lordgamez changed the title MINIFICPP-2587 Add Sparkplug support for ConsumeMQTT processor MINIFICPP-2587 Add Sparkplug B support for ConsumeMQTT processor Jul 21, 2025
CONTROLLERS.md Outdated

### Description

Reads Sparkplug B messages and turns them into individual Record objects. The reader expects a single Sparkplug B payload in a read operation, which is a protobuf-encoded binary message. This reader is typically used with MQTT processors like ConsumeMQTT.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Reads Sparkplug B messages and turns them into individual Record objects. The reader expects a single Sparkplug B payload in a read operation, which is a protobuf-encoded binary message. This reader is typically used with MQTT processors like ConsumeMQTT.
Reads Sparkplug B messages and turns them into individual Record objects. The reader expects a single Sparkplug B payload in a read operation, which is a protobuf-encoded binary message. This reader is designed to be used with ConsumeMQTT, since Sparkplug B is an MQTT-based protocol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 7e2cad5

Comment on lines 441 to 448
| mqtt.broker | | URI of the sending broker |
| mqtt.topic | | Topic of the message |
| mqtt.topic.segment.n | | The nth topic segment of the message |
| mqtt.qos | | The quality of service for this message. |
| mqtt.isDuplicate | | Whether or not this message might be a duplicate of one which has already been received. |
| mqtt.isRetained | | Whether or not this message was from a current publisher, or was "retained" by the server as the last message published on the topic. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm only seeing mqtt.broker in the flow file attributes with the record reader/writer set, but I'm seeing similar record attributes with an underscore prefix. We should make it clear which attribute ends up where.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added info on this in the description in 7e2cad5

Comment on lines 35 to 47
8000
template<typename RecordSetIO>
std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context, const core::PropertyReference& property,
const utils::Identifier& processor_uuid) {
std::string service_name = context.getProperty(property).value_or("");
if (!IsNullOrEmpty(service_name)) {
auto record_set_io = std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name, processor_uuid));
if (!record_set_io) {
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, fmt::format("'{}' property is set to invalid controller service '{}'", property.name, service_name));
}
return record_set_io;
}
return nullptr;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use utils::parseOptionalControllerService() instead of this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My problem with that currently that we do not differentiate if the property was set, but it was invalid or if the property was not set at all. Maybe we should change the current utility to only return std::nullopt if the input is invalid, otherwise return nullptr, what do you think?

Copy link
Contributor
@fgerlits fgerlits Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Another option would be to throw on invalid and return either nullopt or nullptr if the property is not set.

EDIT: we agreed that I will change utils::parseOptionalControll 8000 erService() to throw (and also to not use optional) in a separate PR. For now, this can stay as it is, and we will rebase one or the other PR depending on which gets merged first.

@lordgamez lordgamez changed the base branch from MINIFICPP-2567 to sparkplug_base August 6, 2025 08:31
@lordgamez lordgamez marked this pull request as draft August 6, 2025 08:31
@lordgamez lordgamez changed the base branch from sparkplug_base to MINIFICPP-2603 October 20, 2025 15:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

0