Skip to content

Commit 1f3c09a

Browse files
Refactor repair_amqqueue_nodes
- Only query node uids via RPC for missing members - Update nodes in state to map if FF is enabled - Properly return whether queue repaired or not Co-authored-by: Péter Gömöri <[email protected]>
1 parent 40d8fea commit 1f3c09a

File tree

2 files changed

+151
-30
lines changed

2 files changed

+151
-30
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 87 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -724,43 +724,100 @@ repair_amqqueue_nodes(Q0) ->
724724
{Name, _} = amqqueue:get_pid(Q0),
725725
Members = ra_leaderboard:lookup_members(Name),
726726
RaNodes = [N || {_, N} <- Members],
727-
Nodes = get_nodes(Q0),
728-
case lists:sort(RaNodes) =:= lists:sort(Nodes) of
727+
case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
728+
false ->
729+
Nodes = get_nodes(Q0),
730+
case lists:sort(RaNodes) =:= lists:sort(Nodes) of
731+
true ->
732+
%% up to date
733+
ok;
734+
false ->
735+
%% update amqqueue record
736+
Fun = fun (Q) ->
737+
TS0 = amqqueue:get_type_state(Q),
738+
TS = TS0#{nodes => RaNodes},
739+
amqqueue:set_type_state(Q, TS)
740+
end,
741+
_ = rabbit_amqqueue:update(QName, Fun),
742+
repaired
743+
end;
744+
true ->
745+
{ok, Q0} = rabbit_amqqueue:lookup(QName),
746+
OldTypeState = amqqueue:get_type_state(Q0),
747+
case OldTypeState of
748+
#{nodes := List} when is_list(List) ->
749+
repair_with_list_nodes(QName, Name, RaNodes, OldTypeState);
750+
#{nodes := Map} when is_map(Map) ->
751+
repair_with_map_nodes(QName, Name, RaNodes, Map)
752+
end
753+
end.
754+
755+
%% @doc Repair logic when OldTypeState has a list as nodes value.
756+
%% Only updates the queue state if ALL nodes return valid UIDs.
757+
repair_with_list_nodes(QName, Name, RaNodes, _OldTypeState) ->
758+
case gather_node_uids(QName, Name, RaNodes) of
759+
{NewNodesUids, _ErrorList = []} ->
760+
%% All nodes returned valid UIDs, proceed with update
761+
Fun = fun (Q) ->
762+
Ts0 = amqqueue:get_type_state(Q),
763+
Ts = Ts0#{nodes => NewNodesUids},
764+
amqqueue:set_type_state(Q, Ts)
765+
end,
766+
_ = rabbit_amqqueue:update(QName, Fun),
767+
repaired;
768+
_ ->
769+
%% Fetching UID for at least some nodes failed
770+
%% Do not update the queue state
771+
ok
772+
end.
773+
774+
%% @doc Repair logic when OldTypeState has a map as nodes value.
775+
%% Only adds new nodes that return valid UIDs.
776+
repair_with_map_nodes(QName, Name, RaNodes, PreviousUidsMap) ->
777+
PrevNodes = maps:keys(PreviousUidsMap),
778+
case lists:sort(PrevNodes) == lists:sort(RaNodes) of
729779
true ->
730-
%% up to date
731780
ok;
732781
false ->
733-
%% update amqqueue record
782+
NodesToAdd = RaNodes -- PrevNodes,
783+
{AddedNodesUids, _ErrorList} = gather_node_uids(QName, Name, NodesToAdd),
784+
RemainingNodesUids = maps:with(RaNodes, PreviousUidsMap),
785+
NewNodes = maps:merge(RemainingNodesUids, AddedNodesUids),
734786
Fun = fun (Q) ->
735-
TS0 = amqqueue:get_type_state(Q),
736-
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
737-
false ->
738-
TS0#{nodes => RaNodes};
739-
true ->
740-
RaUidsList = [begin
741-
Uid = erpc:call(N, ra_directory, uid_of,
742-
[?RA_SYSTEM, Name],
743-
?RPC_TIMEOUT),
744-
case Uid of
745-
undefined ->
746-
?LOG_WARNING("Unexpected undefined uuid from node ~p for quorum queue ~ts during repair_amqqueue_nodes",
747-
[N, rabbit_misc:rs(QName)]);
748-
_ ->
749-
ok
750-
end,
751-
{N, Uid}
752-
end
753-
|| N <- RaNodes],
754-
755-
RaUids = maps:from_list(RaUidsList),
756-
TS0#{nodes => RaUids}
757-
end,
758-
amqqueue:set_type_state(Q, TS)
787+
Ts0 = amqqueue:get_type_state(Q),
788+
Ts = Ts0#{nodes => NewNodes},
789+
amqqueue:set_type_state(Q, Ts)
759790
end,
760791
_ = rabbit_amqqueue:update(QName, Fun),
761792
repaired
762793
end.
763794

795+
gather_node_uids(QName, Name, RaNodes) ->
796+
RPCRes = erpc:multicall(RaNodes, ra_directory, uid_of, [?RA_SYSTEM, Name], ?RPC_TIMEOUT),
797+
NewNodesUidsList0 = lists:zip(RaNodes, RPCRes),
798+
799+
%% Check if all nodes returned valid UIDs
800+
{ValidList, ErrorList} =
801+
lists:partition(
802+
fun({_Node, {ok, UId}}) when UId =/= undefined ->
803+
true;
804+
(_) ->
805+
false
806+
end, NewNodesUidsList0),
807+
NewNodesUidsList = [{Node, UId} || {Node, {ok, UId}} <- ValidList],
808+
809+
lists:foreach(fun({Node, {ok, undefined}}) ->
810+
?LOG_WARNING("Unexpected undefined uuid from node ~p "
811+
"for quorum ~ts during repair_amqqueue_nodes",
812+
[Node, rabbit_misc:rs(QName)]);
813+
({Node, CaughtCallException}) ->
814+
?LOG_WARNING("Call exception while retrieving uuid from node ~p "
815+
"for quorum ~ts during repair_amqqueue_nodes: ~p",
816+
[Node, rabbit_misc:rs(QName), CaughtCallException])
817+
end, ErrorList),
818+
819+
{maps:from_list(NewNodesUidsList), ErrorList}.
820+
764821
reductions(Name) ->
765822
try
766823
{reductions, R} = process_info(whereis(Name), reductions),
@@ -821,7 +878,7 @@ recover(_Vhost, Queues) ->
821878
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
822879
case RaUId of
823880
undefined ->
824-
?LOG_WARNING("Unexpected undefined uuid for current node for quorum queue ~ts during recover",
881+
?LOG_WARNING("Unexpected undefined uuid for current node for quorum ~ts during recover",
825882
[rabbit_misc:rs(QName)]);
826883
_ ->
827884
ok
@@ -838,7 +895,7 @@ recover(_Vhost, Queues) ->
838895
#{node() := _NewRaUId} ->
839896
%% Queue is aware but it does not match the one returned by
840897
%% ra_directory
841-
rabbit_log:info("Quorum queue ~ts: detected node uuid change, "
898+
rabbit_log:info("Quorum ~ts: detected node uuid change, "
842899
"deleting old data directory", [rabbit_misc:rs(QName)]),
843900
maybe_delete_data_dir(RaUId)
844901
end,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ groups() ->
104104
force_checkpoint_on_queue,
105105
force_checkpoint,
106106
policy_repair,
107+
repair_metadata_nodes_list_to_map,
108+
repair_metadata_nodes_added_member,
109+
repair_metadata_nodes_removed_member,
110+
repair_metadata_nodes_added_removed_member,
107111
gh_12635,
108112
replica_states,
109113
restart_after_queue_reincarnation,
@@ -1522,6 +1526,66 @@ force_checkpoint(Config) ->
15221526
% Result should only have quorum queue
15231527
?assertEqual(ExpectedRes, ForceCheckpointRes).
15241528

1529+
repair_metadata_nodes_list_to_map(Config) ->
1530+
%% After feature flag `track_qq_members_uids` is enabled, quorum
1531+
%% queues will convert their type state in metadata store
1532+
%% from nodes list to node=>uid mappings
1533+
UpdateFun =
1534+
fun(QueueRec) ->
1535+
#{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec),
1536+
amqqueue:set_type_state(QueueRec, TypeState#{nodes => maps:keys(NodesMap)})
1537+
end,
1538+
repair_metadata_nodes(Config, UpdateFun).
1539+
1540+
repair_metadata_nodes_added_member(Config) ->
1541+
Server1 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
1542+
UpdateFun =
1543+
fun(QueueRec) ->
1544+
#{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec),
1545+
amqqueue:set_type_state(QueueRec, TypeState#{nodes => maps:remove(Server1, NodesMap)})
1546+
end,
1547+
repair_metadata_nodes(Config, UpdateFun).
1548+
1549+
repair_metadata_nodes_removed_member(Config) ->
1550+
UpdateFun =
1551+
fun(QueueRec) ->
1552+
#{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec),
1553+
amqqueue:set_type_state(QueueRec, TypeState#{nodes => NodesMap#{'rabbit@foo' => <<"dummy_uid">>}})
1554+
end,
1555+
repair_metadata_nodes(Config, UpdateFun).
1556+
1557+
repair_metadata_nodes_added_removed_member(Config) ->
1558+
Server1 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
1559+
UpdateFun =
1560+
fun(QueueRec) ->
1561+
#{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec),
1562+
NewNodeMap = maps:remove(Server1, NodesMap#{'rabbit@foo' => <<"dummy_uid">>}),
1563+
amqqueue:set_type_state(QueueRec, TypeState#{nodes => NewNodeMap})
1564+
end,
1565+
repair_metadata_nodes(Config, UpdateFun).
1566+
1567+
repair_metadata_nodes(Config, UpdateFun) ->
1568+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
1569+
1570+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1571+
QQ = ?config(queue_name, Config),
1572+
QQName = rabbit_misc:r(<<"/">>, queue, QQ),
1573+
1574+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
1575+
1576+
1577+
QueueRecBefore = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QQName]),
1578+
1579+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, update, [QQName, UpdateFun]),
1580+
1581+
?assertEqual(repaired, rpc:call(Server, rabbit_quorum_queue, repair_amqqueue_nodes,
1582+
[QQName])),
1583+
1584+
QueueRecAfter = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QQName]),
1585+
1586+
?assertEqual(QueueRecBefore, QueueRecAfter),
1587+
ok.
1588+
15251589
% Tests that, if the process of a QQ is dead in the moment of declaring a policy
15261590
% that affects such queue, when the process is made available again, the policy
15271591
% will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863)

0 commit comments

Comments
 (0)