Skip to content

Commit fc20eac

Browse files
Merge pull request #15166 from rabbitmq/rabbitmq-server-13873
#13873 by @Ayanda-D, polished and rebased
2 parents 85b5fab + 066ae35 commit fc20eac

21 files changed

+567
-50
lines changed

.github/workflows/authorization-server-make.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ on:
1111
paths:
1212
- .github/workflows/authorization-server-make.yaml
1313
- selenium/authorization-server
14-
14+
1515
env:
1616
REGISTRY_IMAGE: pivotalrabbitmq/spring-authorization-server
1717
IMAGE_TAG: 0.0.11
1818
jobs:
1919
docker:
2020
runs-on: ubuntu-latest
2121
steps:
22-
22+
2323
- name: CHECKOUT REPOSITORY
2424
uses: actions/checkout@v6
2525

@@ -28,10 +28,10 @@ jobs:
2828
with:
2929
username: ${{ secrets.DOCKERHUB_USERNAME }}
3030
password: ${{ secrets.DOCKERHUB_PASSWORD }}
31-
31+
3232
- name: Build and push
3333
uses: docker/build-push-action@v6
3434
with:
35-
context: selenium/authorization-server
35+
context: selenium/authorization-server
3636
push: true
3737
tags: ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }}

.github/workflows/test-management-ui-for-pr.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,4 @@ jobs:
7878
with:
7979
name: test-artifacts-${{ matrix.browser }}-${{ matrix.erlang_version }}
8080
path: ${{ env.SELENIUM_ARTIFACTS }}/*
81-
81+

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2859,6 +2859,11 @@ end}.
28592859
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
28602860
[{datatype, [{enum, [true, false]}, integer, string]}]}.
28612861

2862+
{mapping, "stream.read_ahead_limit", "rabbit.stream_read_ahead_limit", [
2863+
{datatype, [integer, string]},
2864+
{validators, ["is_supported_information_unit"]}
2865+
]}.
2866+
28622867
{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
28632868
{datatype, [binary]}
28642869
]}.

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2613,6 +2613,7 @@ rejected(QNameBin, down) ->
26132613
[{{symbol, <<"queue">>}, {utf8, QNameBin}},
26142614
{{symbol, <<"reason">>}, {symbol, <<"unavailable">>}}]}}}.
26152615

2616+
26162617
maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
26172618
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
26182619
true ->

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1638,27 +1638,27 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
16381638
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
16391639
dlx = DlxState} = State = State3,
16401640
{_, DlxEffects} = rabbit_fifo_dlx:discard([Msg], maxlen, DLH, DlxState),
1641-
{State, combine_effects(DlxEffects, Effects)};
1641+
{State, add_drop_head_effects(DlxEffects, Effects)};
16421642
empty ->
16431643
{State0, Effects}
16441644
end.
16451645

1646-
%% combine global counter update effects to avoid bulding a huge list of
1647-
%% effects if many messages are dropped at the same time as could happen
1648-
%% when the `max_length' is changed via a configuration update.
1649-
combine_effects([{mod_call,
1650-
rabbit_global_counters,
1651-
messages_dead_lettered,
1652-
[Reason, rabbit_quorum_queue, Type, NewLen]}],
1653-
[{mod_call,
1654-
rabbit_global_counters,
1655-
messages_dead_lettered,
1656-
[Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) ->
1646+
add_drop_head_effects([{mod_call,
1647+
rabbit_global_counters,
1648+
messages_dead_lettered,
1649+
[Reason, rabbit_quorum_queue, Type, NewLen]}],
1650+
[{mod_call,
1651+
rabbit_global_counters,
1652+
messages_dead_lettered,
1653+
[Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) ->
1654+
%% combine global counter update effects to avoid bulding a huge list of
1655+
%% effects if many messages are dropped at the same time as could happen
1656+
%% when the `max_length' is changed via a configuration update.
16571657
[{mod_call,
16581658
rabbit_global_counters,
16591659
messages_dead_lettered,
16601660
[Reason, rabbit_quorum_queue, Type, PrevLen + NewLen]} | Rem];
1661-
combine_effects(New, Old) ->
1661+
add_drop_head_effects(New, Old) ->
16621662
New ++ Old.
16631663

16641664
maybe_set_msg_ttl(Msg, RaCmdTs, Header,

deps/rabbit/src/rabbit_plugins.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ list() ->
175175
PluginsPath = plugins_dir(),
176176
list(PluginsPath).
177177

178+
178179
%% @doc Get the list of plugins which are ready to be enabled.
179180

180181
-spec list(string()) -> [#plugin{}].

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1515,29 +1515,20 @@ shrink_all(Node) ->
15151515
amqqueue:get_type(Q) == ?MODULE,
15161516
lists:member(Node, get_nodes(Q))].
15171517

1518-
1518+
-spec grow(node() | integer(), binary(), binary(), all | even) ->
1519+
[{rabbit_amqqueue:name(),
1520+
{ok, pos_integer()} | {error, pos_integer(), term()}}].
15191521
grow(Node, VhostSpec, QueueSpec, Strategy) ->
15201522
grow(Node, VhostSpec, QueueSpec, Strategy, promotable).
15211523

1522-
-spec grow(node(), binary(), binary(), all | even, membership()) ->
1524+
-spec grow(node() | integer(), binary(), binary(), all | even, membership()) ->
15231525
[{rabbit_amqqueue:name(),
15241526
{ok, pos_integer()} | {error, pos_integer(), term()}}].
1525-
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
1527+
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) ->
15261528
Running = rabbit_nodes:list_running(),
15271529
[begin
15281530
Size = length(get_nodes(Q)),
1529-
QName = amqqueue:get_name(Q),
1530-
?LOG_INFO("~ts: adding a new member (replica) on node ~w",
1531-
[rabbit_misc:rs(QName), Node]),
1532-
case add_member(Q, Node, Membership) of
1533-
ok ->
1534-
{QName, {ok, Size + 1}};
1535-
{error, Err} ->
1536-
?LOG_WARNING(
1537-
"~ts: failed to add member (replica) on node ~w, error: ~w",
1538-
[rabbit_misc:rs(QName), Node, Err]),
1539-
{QName, {error, Size, Err}}
1540-
end
1531+
maybe_grow(Q, Node, Membership, Size)
15411532
end
15421533
|| Q <- rabbit_amqqueue:list(),
15431534
amqqueue:get_type(Q) == ?MODULE,
@@ -1547,7 +1538,91 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
15471538
lists:member(Node, Running),
15481539
matches_strategy(Strategy, get_nodes(Q)),
15491540
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
1550-
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
1541+
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ];
1542+
1543+
grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership)
1544+
when is_integer(QuorumClusterSize), QuorumClusterSize > 0 ->
1545+
Running = rabbit_nodes:list_running(),
1546+
TotalRunning = length(Running),
1547+
1548+
TargetQuorumClusterSize =
1549+
if QuorumClusterSize > TotalRunning ->
1550+
%% we can't grow beyond total running nodes
1551+
TotalRunning;
1552+
true ->
1553+
QuorumClusterSize
1554+
end,
1555+
1556+
lists:flatten(
1557+
[begin
1558+
QNodes = get_nodes(Q),
1559+
case length(QNodes) of
1560+
Size when Size < TargetQuorumClusterSize ->
1561+
TargetAvailableNodes = Running -- QNodes,
1562+
N = length(TargetAvailableNodes),
1563+
Node = lists:nth(rand:uniform(N), TargetAvailableNodes),
1564+
maybe_grow(Q, Node, Membership, Size);
1565+
_ ->
1566+
[]
1567+
end
1568+
end
1569+
|| _ <- lists:seq(1, TargetQuorumClusterSize),
1570+
Q <- rabbit_amqqueue:list(),
1571+
amqqueue:get_type(Q) == ?MODULE,
1572+
matches_strategy(Strategy, get_nodes(Q)),
1573+
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
1574+
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]);
1575+
1576+
grow(QuorumClusterSize, _VhostSpec, _QueueSpec, _Strategy, _Membership)
1577+
when is_integer(QuorumClusterSize) ->
1578+
rabbit_log:warning(
1579+
"cannot grow queues to a quorum cluster size less than zero (~tp)",
1580+
[QuorumClusterSize]),
1581+
{error, bad_quorum_cluster_size}.
1582+
1583+
maybe_grow(Q, Node, Membership, Size) ->
1584+
QNodes = get_nodes(Q),
1585+
maybe_grow(Q, Node, Membership, Size, QNodes).
1586+
1587+
maybe_grow(Q, Node, Membership, Size, QNodes) ->
1588+
QName = amqqueue:get_name(Q),
1589+
{ok, RaName} = qname_to_internal_name(QName),
1590+
case check_all_memberships(RaName, QNodes, voter) of
1591+
true ->
1592+
?LOG_INFO("~ts: adding a new member (replica) on node ~w",
1593+
[rabbit_misc:rs(QName), Node]),
1594+
case add_member(Q, Node, Membership) of
1595+
ok ->
1596+
{QName, {ok, Size + 1}};
1597+
{error, Err} ->
1598+
?LOG_WARNING(
1599+
"~ts: failed to add member (replica) on node ~w, error: ~w",
1600+
[rabbit_misc:rs(QName), Node, Err]),
1601+
{QName, {error, Size, Err}}
1602+
end;
1603+
false ->
1604+
Err = {error, non_voters_found},
1605+
?LOG_WARNING(
1606+
"~ts: failed to add member (replica) on node ~w, error: ~w",
1607+
[rabbit_misc:rs(QName), Node, Err]),
1608+
{QName, {error, Size, Err}}
1609+
end.
1610+
1611+
%% Compare local membership states of all nodes in parallel.
1612+
%%
1613+
%% Note a few things:
1614+
%% 1. This function intentionally queries local member state and not the leader
1615+
%% 2. ra:key_metrics/1 is sequential and not parallel
1616+
%% 3. ra:key_metrics/1 is not multicall-friendly because it relies on erlang:node/0
1617+
check_all_memberships(RaName, QNodes, CompareMembership) ->
1618+
case rpc:multicall(QNodes, ets, lookup, [ra_state, RaName]) of
1619+
{Result, []} ->
1620+
lists:all(
1621+
fun(M) -> M == CompareMembership end,
1622+
[Membership || [{_RaName, _RaState, Membership}] <- Result]);
1623+
_ ->
1624+
false
1625+
end.
15511626

15521627
-spec transfer_leadership(amqqueue:amqqueue(), node()) ->
15531628
{migrated, node()} | {not_migrated, atom()}.

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@ credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain,
535535

536536
supports_stateful_delivery() -> true.
537537

538+
538539
deliver(QSs, Msg, Options) ->
539540
lists:foldl(
540541
fun({Q, stateless}, {Qs, Actions}) ->

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ groups() ->
116116
node_removal_is_not_quorum_critical,
117117
select_nodes_with_least_replicas,
118118
select_nodes_with_least_replicas_node_down,
119-
subscribe_from_each
119+
subscribe_from_each,
120+
grow_queue
120121

121122

122123
]},
@@ -1768,6 +1769,116 @@ dont_leak_file_handles(Config) ->
17681769
rabbit_ct_client_helpers:close_channel(C),
17691770
ok.
17701771

1772+
grow_queue(Config) ->
1773+
[Server0, Server1, Server2, _Server3, _Server4] =
1774+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1775+
1776+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1777+
QQ = ?config(queue_name, Config),
1778+
AQ = ?config(alt_queue_name, Config),
1779+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1780+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
1781+
{<<"x-quorum-initial-group-size">>, long, 5}])),
1782+
?assertEqual({'queue.declare_ok', AQ, 0, 0},
1783+
declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
1784+
{<<"x-quorum-initial-group-size">>, long, 5}])),
1785+
1786+
QQs = [QQ, AQ],
1787+
MsgCount = 3,
1788+
1789+
[begin
1790+
RaName = ra_name(Q),
1791+
rabbit_ct_client_helpers:publish(Ch, Q, MsgCount),
1792+
wait_for_messages_ready([Server0], RaName, MsgCount),
1793+
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1794+
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1795+
?assertEqual(5, length(Nodes0))
1796+
end || Q <- QQs],
1797+
1798+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1799+
force_all_queues_shrink_member_to_current_member, []),
1800+
1801+
TargetClusterSize_1 = 1,
1802+
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
1803+
1804+
%% grow queues to node 'Server1'
1805+
TargetClusterSize_2 = 2,
1806+
Result1 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]),
1807+
%% [{{resource,<<"/">>,queue,<<"grow_queue">>},{ok,2}},
1808+
%% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{ok,2}},...]
1809+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result1)),
1810+
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),
1811+
1812+
%% grow queues to quorum cluster size '2' has no effect
1813+
Result2 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]),
1814+
?assertEqual([], Result2),
1815+
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),
1816+
1817+
%% grow queues to quorum cluster size '3'
1818+
TargetClusterSize_3 = 3,
1819+
Result3 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all, voter]),
1820+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result3)),
1821+
assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount),
1822+
1823+
%% grow queues to quorum cluster size '5'
1824+
TargetClusterSize_5 = 5,
1825+
Result4 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all, voter]),
1826+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result4)),
1827+
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),
1828+
1829+
%% shrink all queues again down to 1 member
1830+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1831+
force_all_queues_shrink_member_to_current_member, []),
1832+
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
1833+
1834+
%% grow queues to quorum cluster size > '5' (limit = 5).
1835+
TargetClusterSize_10 = 10,
1836+
Result5 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]),
1837+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result5)),
1838+
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),
1839+
1840+
%% shrink all queues again down to 1 member
1841+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1842+
force_all_queues_shrink_member_to_current_member, []),
1843+
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
1844+
1845+
%% attempt to grow queues to quorum cluster size < '0'.
1846+
BadTargetClusterSize = -5,
1847+
?assertEqual({error, bad_quorum_cluster_size},
1848+
rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])),
1849+
1850+
%% shrink all queues again down to 1 member
1851+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1852+
force_all_queues_shrink_member_to_current_member, []),
1853+
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
1854+
1855+
%% grow queues to node 'Server1': non_voter
1856+
rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all, non_voter]),
1857+
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),
1858+
1859+
%% grow queues to node 'Server2': fail, non_voters found
1860+
Result6 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server2, <<"/">>, <<".*">>, all, voter]),
1861+
%% [{{resource,<<"/">>,queue,<<"grow_queue">>},{error, 2, {error, non_voters_found}},
1862+
%% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{error, 2, {error, non_voters_found}},...]
1863+
?assert(lists:all(
1864+
fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result6)),
1865+
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),
1866+
1867+
%% grow queues to target quorum cluster size '5': fail, non_voters found
1868+
Result7 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]),
1869+
?assert(lists:all(
1870+
fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result7)),
1871+
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount).
1872+
1873+
assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) ->
1874+
[begin
1875+
RaName = ra_name(Q),
1876+
wait_for_messages_ready([Node], RaName, MsgCount),
1877+
{ok, Q0} = rpc:call(Node, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1878+
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1879+
?assertEqual(TargetClusterSize, length(Nodes0))
1880+
end || Q <- Qs].
1881+
17711882
gh_12635(Config) ->
17721883
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
17731884
[Server0, _Server1, Server2] =

0 commit comments

Comments
 (0)