8000 fix: use correct timestamp template for kafka and remove such fields from azure event hub producer by thalesmg · Pull Request #11513 · emqx/emqx · GitHub
[go: up one dir, main page]

Skip to content

Conversation

@thalesmg
Copy link
Contributor
@thalesmg thalesmg commented Aug 24, 2023

targeting release-52

Fixes https://emqx.atlassian.net/browse/EMQX-10847

Checking the whole Kafka message from AEH, it seems like the timestamp type is append,
which means that it’s the broker who controls the timestamp, and the timestamp defined by
the producer is ignored.

Ref: https://kafka.apache.org/documentation/#brokerconfigs_log.message.timestamp.type

Example message consumed from AEH:

%{
  "headers" => %{},
  "key" => "",
  "offset" => 4,
  "topic" => "test0",
  "ts" => 1692879703006,
  "ts_type" => "append",
  "value" => "{\"username\":\"undefined\",\"topic\":\"t/aeh/produ\",\"timestamp\":1692879692189,\"qos\":0,\"publish_received_at\":1692879692189,\"pub_props\":{\"User-Property\":{}},\"peerhost\":\"undefined\",\"payload\":\"aaaa\",\"node\":\"emqx@127.0.0.1\",\"metadata\":{\"rule_id\":\"rule_aehp\"},\"id\":\"000603AA44B34E08F4AF000006E30003\",\"flags\":{},\"event\":\"message.publish\",\"clientid\":\"undefined\"}"
}

Note the ts_type above is append.

Example message from a Kafka broker whose ts type is create:

%{
  "headers" => %{},
  "key" => "",
  "offset" => 4,
  "topic" => "test-topic-three-partitions",
  "ts" => 1692881883668,
  "ts_type" => "create",
  "value" => "{\"username\":\"undefined\",\"topic\":\"t/kafka/produ\",\"timestamp\":1692881883668,\"qos\":0,\"publish_received_at\":1692881883668,\"pub_props\":{\"User-Property\":{}},\"peerhost\":\"undefined\",\"payload\":\"aaaaaa\",\"node\":\"emqx@127.0.0.1\",\"id\":\"000603AAC7529FEEF4AC000007050000\",\"flags\":{},\"event\":\"message.publish\",\"clientid\":\"undefined\"}"
}

Unfortunately, I couldn’t find anywhere in AEH where that configuration could be changed.

Summary

🤖 Generated by Copilot at 1622b7d

This pull request adds a new feature to the emqx_bridge_azure_event_hub application to allow sending messages with custom fields to Azure Event Hub. It also fixes some bugs and improves the test cases for the emqx_bridge_azure_event_hub and emqx_bridge_kafka applications. The version number of the emqx_bridge_azure_event_hub application is incremented.

PR Checklist

Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:

  • Added tests for the changes
  • [na] Added property-based tests for code which performs user input validation
  • Changed lines covered in coverage report
  • Change log has been added to changes/(ce|ee)/(feat|perf|fix)-<PR-id>.en.md files
  • For internal contributor: there is a jira ticket to track this change
  • [na] Created PR to emqx-docs if documentation update is required, or link to a follow-up jira ticket
  • [na] Schema changes are backward compatible

Checklist for CI (.github/workflows) changes

  • [na] If changed package build workflow, pass this action (manual trigger)
  • [na] Change log has been added to changes/ dir for user-facing artifacts update

@thalesmg thalesmg force-pushed the kafka-fix-ts-template-r52-20230824 branch 4 times, most recently from 0b1ccce to e4a2f20 Compare August 24, 2023 15:18
Fixes https://emqx.atlassian.net/browse/EMQX-10847

Checking the whole Kafka message from AEH, it seems like the timestamp type is append,
which means that it’s the broker who controls the timestamp, and the timestamp defined by
the producer is ignored.

Ref: https://kafka.apache.org/documentation/#brokerconfigs_log.message.timestamp.type

Example message consumed from AEH:
```
%{
  "headers" => %{},
  "key" => "",
  "offset" => 4,
  "topic" => "test0",
  "ts" => 1692879703006,
  "ts_type" => "append",
  "value" => "{\"username\":\"undefined\",\"topic\":\"t/aeh/produ\",\"timestamp\":1692879692189,\"qos\":0,\"publish_received_at\":1692879692189,\"pub_props\":{\"User-Property\":{}},\"peerhost\":\"undefined\",\"payload\":\"aaaa\",\"node\":\"emqx@127.0.0.1\",\"metadata\":{\"rule_id\":\"rule_aehp\"},\"id\":\"000603AA44B34E08F4AF000006E30003\",\"flags\":{},\"event\":\"message.publish\",\"clientid\":\"undefined\"}"
}
```

Note the ts_type above is append.

Example message from a Kafka broker whose ts type is create:
```
%{
  "headers" => %{},
  "key" => "",
  "offset" => 4,
  "topic" => "test-topic-three-partitions",
  "ts" => 1692881883668,
  "ts_type" => "create",
  "value" => "{\"username\":\"undefined\",\"topic\":\"t/kafka/produ\",\"timestamp\":1692881883668,\"qos\":0,\"publish_received_at\":1692881883668,\"pub_props\":{\"User-Property\":{}},\"peerhost\":\"undefined\",\"payload\":\"aaaaaa\",\"node\":\"emqx@127.0.0.1\",\"id\":\"000603AAC7529FEEF4AC000007050000\",\"flags\":{},\"event\":\"message.publish\",\"clientid\":\"undefined\"}"
}
```

Unfortunately, I couldn’t find anywhere in AEH where that configuration could be changed.
@thalesmg thalesmg force-pushed the kafka-fix-ts-template-r52-20230824 branch from e4a2f20 to 016ae05 Compare August 24, 2023 17:20
@thalesmg thalesmg marked this pull request as ready for review August 24, 2023 18:59
@thal
85BF
esmg thalesmg requested a review from a team as a code owner August 24, 2023 18:59
@thalesmg thalesmg merged commit d9a5a9e into emqx:release-52 Aug 24, 2023
@thalesmg thalesmg deleted the kafka-fix-ts-template-r52-20230824 branch August 24, 2023 19:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

0