8000 fix(kafka producer): kick off recovery from disk for fixed topics, migrate old replayq directories by thalesmg · Pull Request #14015 · emqx/emqx · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_bridge_azure_event_hub, [
{description, "EMQX Enterprise Azure Event Hub Bridge"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [
kernel,
Expand Down
8000
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ bridge_v2_overrides() ->
parameters =>
mk(ref(producer_kafka_opts), #{
required => true,
validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
validator => fun emqx_bridge_kafka:producer_parameters_validator/1
}),
ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}),
type => mk(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,16 @@ t_dynamic_topics(Config) ->
]
),
ok.

t_disallow_disk_mode_for_dynamic_topic(Config) ->
ActionConfig = ?config(action_config, Config),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_bridge_confluent, [
{description, "EMQX Enterprise Confluent Connector and Action"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [
kernel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ bridge_v2_overrides() ->
parameters =>
mk(ref(producer_kafka_opts), #{
required => true,
validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
validator => fun emqx_bridge_kafka:producer_parameters_validator/1
}),
ssl => mk(ref(ssl_client_opts), #{
default => #{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,16 @@ t_dynamic_topics(Config) ->
]
),
ok.

t_disallow_disk_mode_for_dynamic_topic(Config) ->
ActionConfig = ?config(action_config, Config),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.
2 changes: 1 addition & 1 deletion apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.5.0"},
{vsn, "0.5.1"},
{registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [
kernel,
Expand Down
37 changes: 27 additions & 10 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka).

-feature(maybe_expr, enable).

-behaviour(emqx_connector_examples).

-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").

%% allow atoms like scram_sha_256 and scram_sha_512
%% i.e. the _256 part does not start with a-z
-elvis([
{elvis_style, atom_naming_convention, #{
regex => "^([a-z][a-z0-9]*_?)([a-z0-9]*_?)*$",
enclosed_atoms => ".*"
}}
]).
-elvis([{elvis_style, atom_naming_convention, disable}]).
-import(hoconsc, [mk/2, enum/1, ref/2]).

-export([
Expand All @@ -40,7 +35,9 @@
-export([
kafka_connector_config_fields/0,
kafka_producer_converter/2,
producer_strategy_key_validator/1
producer_strategy_key_validator/1,
producer_buffer_mode_validator/1,
producer_parameters_validator/1
]).

-define(CONNECTOR_TYPE, kafka_producer).
Expand Down Expand Up @@ -721,7 +718,7 @@ parameters_field(ActionOrBridgeV1) ->
required => true,
aliases => [Alias],
desc => ?DESC(producer_kafka_opts),
validator => fun producer_strategy_key_validator/1
validator => fun producer_parameters_validator/1
})}.

%% -------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -781,6 +778,26 @@ consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) ->
{error, "Kafka topics must not be repeated in a bridge"}
end.

producer_parameters_validator(Conf) ->
maybe
ok ?= producer_strategy_key_validator(Conf),
ok ?= producer_buffer_mode_validator(Conf)
end.

producer_buffer_mode_validator(#{buffer := _} = Conf) ->
producer_buffer_mode_validator(emqx_utils_maps:binary_key_map(Conf));
producer_buffer_mode_validator(#{<<"buffer">> := #{<<"mode">> := disk}, <<"topic">> := Topic}) ->
Template = emqx_template:parse(Topic),
case emqx_template:placeholders(Template) of
[] ->
ok;
[_ | _] ->
{error, <<"disk-mode buffering is disallowed when using dynamic topics">>}
end;
producer_buffer_mode_validator(_) ->
%% `buffer' field is not required
ok.

producer_strategy_key_validator(
#{
partition_strategy := _,
Expand Down
83 changes: 80 additions & 3 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
handle_telemetry_event/4
]).

-ifdef(TEST).
-export([replayq_dir/2]).
-endif.

-include_lib("emqx/include/logger.hrl").

%% Allocatable resources
Expand Down Expand Up @@ -159,11 +163,30 @@ create_producers_for_bridge_v2(
#{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId),
IsDryRun = emqx_resource:is_dry_run(ActionResId),
ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
),
WolffProducerConfig =
#{replayq_dir := ReplayqDir} = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
),
maybe_migrate_old_replayq_dir(ReplayqDir, ActionResId, TopicType, MKafkaTopic),
case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of
{ok, Producers} ->
case add_fixed_topic(TopicType, MKafkaTopic, Producers) of
ok ->
ok;
{error, Reason} ->
?SLOG(error, #{
msg => "kafka_producer_failed_to_add_fixed_topic",
instance_id => ConnResId,
kafka_client_id => ClientId,
kafka_topic => MKafkaTopic,
reason => Reason
}),
wolff:stop_and_delete_supervised_producers(Producers),
throw(
"Failed to start producers. Please check the logs for errors and check"
" the configuration parameters."
)
end,
ok = emqx_resource:allocate_resource(
ConnResId, {?kafka_producers, ActionResId}, Producers
),
Expand Down Expand Up @@ -811,6 +834,60 @@ replayq_dir(BridgeType, BridgeName) ->
]),
filename:join([emqx:data_dir(), "kafka", DirName]).

%% new (wolff >= 2.0.0):
%% Dir = filename:join([BaseDir, PathSegment, integer_to_list(Partition)]),
%% old:
%% Dir = f 5C94 ilename:join([BaseDir, Topic, integer_to_list(Partition)]),
maybe_migrate_old_replayq_dir(false, _ActionResId, _TopicType, _Topic) ->
ok;
maybe_migrate_old_replayq_dir(ReplayqDir, ActionResId, fixed = _TopicType, Topic) ->
OldWolffDir = filename:join([ReplayqDir, Topic]),
maybe
true ?= is_old_replayq_dir(OldWolffDir),
NewWolffDir = filename:join([ReplayqDir, <<ActionResId/binary, $_, Topic/binary>>]),
?tp(info, "migrating_old_wolff_dirs", #{
action_id => ActionResId,
from => OldWolffDir,
to => NewWolffDir
}),
ok = file:rename(OldWolffDir, NewWolffDir),
ok
else
_ -> ok
end;
maybe_migrate_old_replayq_dir(_ReplayqDir, _ActionResId, _TopicType, _Topic) ->
ok.

%% new (wolff >= 2.0.0):
is_old_replayq_dir(OldWolffDir) ->
maybe
true ?= filelib:is_dir(OldWolffDir),
{ok, Files} ?= file:list_dir_all(OldWolffDir),
%% Each partition number has a sub-directory.
PartitionDirs = lists:filtermap(
fun(File) ->
PartitionDir = filename:join([OldWolffDir, File]),
IsDir = filelib:is_dir(PartitionDir),
case IsDir andalso string:to_integer(File) of
{_Int, Rest} when Rest =:= ""; Rest =:= <<>> ->
{true, PartitionDir};
_ ->
false
end
end,
Files
),
[_ | _] ?= PartitionDirs,
true
else
_ -> false
end.

add_fixed_topic(fixed, Topic, Producers) ->
wolff:add_topic(Producers, Topic);
add_fixed_topic(_TopicType, _TopicTemplate, _Producers) ->
ok.

%% To avoid losing queued data on disk, we must use the same directory as the old v1
%% bridges, if any. Among the Kafka-based bridges that exist since v1, only Kafka changed
%% its type name. Other bridges are either unchanged, or v2-only, and should use their v2
Expand Down
Loading
Loading
0