Skip to content

Commit bdf7c5b

Browse files
Refactor repair_amqqueue_nodes
- Only query node uids via RPC for missing members - Update nodes in state to map if FF is enabled - Properly return whether queue repaired or not Co-authored-by: Péter Gömöri <[email protected]>
1 parent fc2e7ea commit bdf7c5b

File tree

2 files changed

+151
-30
lines changed

2 files changed

+151
-30
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 87 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -725,43 +725,100 @@ repair_amqqueue_nodes(Q0) ->
725725
{Name, _} = amqqueue:get_pid(Q0),
726726
Members = ra_leaderboard:lookup_members(Name),
727727
RaNodes = [N || {_, N} <- Members],
728-
Nodes = get_nodes(Q0),
729-
case lists:sort(RaNodes) =:= lists:sort(Nodes) of
728+
case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
729+
false ->
730+
Nodes = get_nodes(Q0),
731+
case lists:sort(RaNodes) =:= lists:sort(Nodes) of
732+
true ->
733+
%% up to date
734+
ok;
735+
false ->
736+
%% update amqqueue record
737+
Fun = fun (Q) ->
738+
TS0 = amqqueue:get_type_state(Q),
739+
TS = TS0#{nodes => RaNodes},
740+
amqqueue:set_type_state(Q, TS)
741+
end,
742+
_ = rabbit_amqqueue:update(QName, Fun),
743+
repaired
744+
end;
745+
true ->
746+
{ok, Q0} = rabbit_amqqueue:lookup(QName),
747+
OldTypeState = amqqueue:get_type_state(Q0),
748+
case OldTypeState of
749+
#{nodes := List} when is_list(List) ->
750+
repair_with_list_nodes(QName, Name, RaNodes, OldTypeState);
751+
#{nodes := Map} when is_map(Map) ->
752+
repair_with_map_nodes(QName, Name, RaNodes, Map)
753+
end
754+
end.
755+
756+
%% @doc Repair logic when OldTypeState has a list as nodes value.
757+
%% Only updates the queue state if ALL nodes return valid UIDs.
758+
repair_with_list_nodes(QName, Name, RaNodes, _OldTypeState) ->
759+
case gather_node_uids(QName, Name, RaNodes) of
760+
{NewNodesUids, _ErrorList = []} ->
761+
%% All nodes returned valid UIDs, proceed with update
762+
Fun = fun (Q) ->
763+
Ts0 = amqqueue:get_type_state(Q),
764+
Ts = Ts0#{nodes => NewNodesUids},
765+
amqqueue:set_type_state(Q, Ts)
766+
end,
767+
_ = rabbit_amqqueue:update(QName, Fun),
768+
repaired;
769+
_ ->
770+
%% Fetching UID for at least some nodes failed
771+
%% Do not update the queue state
772+
ok
773+
end.
774+
775+
%% @doc Repair logic when OldTypeState has a map as nodes value.
776+
%% Only adds new nodes that return valid UIDs.
777+
repair_with_map_nodes(QName, Name, RaNodes, PreviousUidsMap) ->
778+
PrevNodes = maps:keys(PreviousUidsMap),
779+
case lists:sort(PrevNodes) == lists:sort(RaNodes) of
730780
true ->
731-
%% up to date
732781
ok;
733782
false ->
734-
%% update amqqueue record
783+
NodesToAdd = RaNodes -- PrevNodes,
784+
{AddedNodesUids, _ErrorList} = gather_node_uids(QName, Name, NodesToAdd),
785+
RemainingNodesUids = maps:with(RaNodes, PreviousUidsMap),
786+
NewNodes = maps:merge(RemainingNodesUids, AddedNodesUids),
735787
Fun = fun (Q) ->
736-
TS0 = amqqueue:get_type_state(Q),
737-
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
738-
false ->
739-
TS0#{nodes => RaNodes};
740-
true ->
741-
RaUidsList = [begin
742-
Uid = erpc:call(N, ra_directory, uid_of,
743-
[?RA_SYSTEM, Name],
744-
?RPC_TIMEOUT),
745-
case Uid of
746-
undefined ->
747-
?LOG_WARNING("Unexpected undefined uuid from node ~p for quorum queue ~ts during repair_amqqueue_nodes",
748-
[N, rabbit_misc:rs(QName)]);
749-
_ ->
750-
ok
751-
end,
752-
{N, Uid}
753-
end
754-
|| N <- RaNodes],
755-
756-
RaUids = maps:from_list(RaUidsList),
757-
TS0#{nodes => RaUids}
758-
end,
759-
amqqueue:set_type_state(Q, TS)
788+
Ts0 = amqqueue:get_type_state(Q),
789+
Ts = Ts0#{nodes => NewNodes},
790+
amqqueue:set_type_state(Q, Ts)
760791
end,
761792
_ = rabbit_amqqueue:update(QName, Fun),
762793
repaired
763794
end.
764795

796+
gather_node_uids(QName, Name, RaNodes) ->
797+
RPCRes = erpc:multicall(RaNodes, ra_directory, uid_of, [?RA_SYSTEM, Name], ?RPC_TIMEOUT),
798+
NewNodesUidsList0 = lists:zip(RaNodes, RPCRes),
799+
800+
%% Check if all nodes returned valid UIDs
801+
{ValidList, ErrorList} =
802+
lists:partition(
803+
fun({_Node, {ok, UId}}) when UId =/= undefined ->
804+
true;
805+
(_) ->
806+
false
807+
end, NewNodesUidsList0),
808+
NewNodesUidsList = [{Node, UId} || {Node, {ok, UId}} <- ValidList],
809+
810+
lists:foreach(fun({Node, {ok, undefined}}) ->
811+
?LOG_WARNING("Unexpected undefined uuid from node ~p "
812+
"for quorum ~ts during repair_amqqueue_nodes",
813+
[Node, rabbit_misc:rs(QName)]);
814+
({Node, CaughtCallException}) ->
815+
?LOG_WARNING("Call exception while retrieving uuid from node ~p "
816+
"for quorum ~ts during repair_amqqueue_nodes: ~p",
817+
[Node, rabbit_misc:rs(QName), CaughtCallException])
818+
end, ErrorList),
819+
820+
{maps:from_list(NewNodesUidsList), ErrorList}.
821+
765822
reductions(Name) ->
766823
try
767824
{reductions, R} = process_info(whereis(Name), reductions),
@@ -822,7 +879,7 @@ recover(_Vhost, Queues) ->
822879
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
823880
case RaUId of
824881
undefined ->
825-
?LOG_WARNING("Unexpected undefined uuid for current node for quorum queue ~ts during recover",
882+
?LOG_WARNING("Unexpected undefined uuid for current node for quorum ~ts during recover",
826883
[rabbit_misc:rs(QName)]);
827884
_ ->
828885
ok
@@ -839,7 +896,7 @@ recover(_Vhost, Queues) ->
839896
#{node() := _NewRaUId} ->
840897
%% Queue is aware but it does not match the one returned by
841898
%% ra_directory
842-
rabbit_log:info("Quorum queue ~ts: detected node uuid change, "
899+
rabbit_log:info("Quorum ~ts: detected node uuid change, "
843900
"deleting old data directory", [rabbit_misc:rs(QName)]),
844901
maybe_delete_data_dir(RaUId)
845902
end,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ groups() ->
105105
force_checkpoint_on_queue,
106106
force_checkpoint,
107107
policy_repair,
108+
repair_metadata_nodes_list_to_map,
109+
repair_metadata_nodes_added_member,
110+
repair_metadata_nodes_removed_member,
111+
repair_metadata_nodes_added_removed_member,
108112
gh_12635,
109113
replica_states,
110114
restart_after_queue_reincarnation,
@@ -1527,6 +1531,66 @@ force_checkpoint(Config) ->
15271531
% Result should only have quorum queue
15281532
?assertEqual(ExpectedRes, ForceCheckpointRes).
15291533

1534+
repair_metadata_nodes_list_to_map(Config) ->
1535+
%% After feature flag `track_qq_members_uids` is enabled, quorum
1536+
%% queues will convert their type state in metadata store
1537+
%% from nodes list to node=>uid mappings
1538+
UpdateFun =
1539+
fun(QueueRec) ->
1540+
#{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec),
1541+
amqqueue:set_type_state(QueueRec, TypeState#{nodes => maps:keys(NodesMap)})
1542+
end,
1543+
repair_metadata_nodes(Config, UpdateFun).
1544+
1545+
repair_metadata_nodes_added_member(Config) ->
1546+
Server1 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
1547+
UpdateFun =
1548+
fun(QueueRec) ->
1549+
#{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec),
1550+
amqqueue:set_type_state(QueueRec, TypeState#{nodes => maps:remove(Server1, NodesMap)})
1551+
end,
1552+
repair_metadata_nodes(Config, UpdateFun).
1553+
1554+
repair_metadata_nodes_removed_member(Config) ->
1555+
UpdateFun =
1556+
fun(QueueRec) ->
1557+
#{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec),
1558+
amqqueue:set_type_state(QueueRec, TypeState#{nodes => NodesMap#{'rabbit@foo' => <<"dummy_uid">>}})
1559+
end,
1560+
repair_metadata_nodes(Config, UpdateFun).
1561+
1562+
repair_metadata_nodes_added_removed_member(Config) ->
1563+
Server1 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
1564+
UpdateFun =
1565+
fun(QueueRec) ->
1566+
#{nodes := NodesMap} = TypeState = amqqueue:get_type_state(QueueRec),
1567+
NewNodeMap = maps:remove(Server1, NodesMap#{'rabbit@foo' => <<"dummy_uid">>}),
1568+
amqqueue:set_type_state(QueueRec, TypeState#{nodes => NewNodeMap})
1569+
end,
1570+
repair_metadata_nodes(Config, UpdateFun).
1571+
1572+
repair_metadata_nodes(Config, UpdateFun) ->
1573+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
1574+
1575+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1576+
QQ = ?config(queue_name, Config),
1577+
QQName = rabbit_misc:r(<<"/">>, queue, QQ),
1578+
1579+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
1580+
1581+
1582+
QueueRecBefore = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QQName]),
1583+
1584+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, update, [QQName, UpdateFun]),
1585+
1586+
?assertEqual(repaired, rpc:call(Server, rabbit_quorum_queue, repair_amqqueue_nodes,
1587+
[QQName])),
1588+
1589+
QueueRecAfter = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QQName]),
1590+
1591+
?assertEqual(QueueRecBefore, QueueRecAfter),
1592+
ok.
1593+
15301594
% Tests that, if the process of a QQ is dead in the moment of declaring a policy
15311595
% that affects such queue, when the process is made available again, the policy
15321596
% will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863)

0 commit comments

Comments
 (0)