Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/rabbitmq_mqtt/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ define PROJECT_ENV
[
{ssl_cert_login,false},
{allow_anonymous, true},
{disconnect_on_unauthorized, true},
{vhost, <<"/">>},
{exchange, <<"amq.topic">>},
{max_session_expiry_interval_seconds, 86400}, %% 1 day
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
{mapping, "mqtt.allow_anonymous", "rabbitmq_mqtt.allow_anonymous",
[{datatype, {enum, [true, false]}}]}.

%% Whether disconnect when publish or subscribe non-authorized topic.
%%
%% {disconnect_on_unauthorized, true},

{mapping, "mqtt.disconnect_on_unauthorized", "rabbitmq_mqtt.disconnect_on_unauthorized",
[{datatype, {enum, [true, false]}}]}.

%% If you have multiple chosts, specify the one to which the
%% adapter connects.
%%
Expand Down
53 changes: 50 additions & 3 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,12 @@ process_request(?SUBSCRIBE,
State0 = #state{cfg = #cfg{proto_ver = ProtoVer,
binding_args_v2 = BindingArgsV2}}) ->
?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]),
DisconnectOnUnauthorized = application:get_env(rabbitmq_mqtt, disconnect_on_unauthorized, true),
{ResultRev, RetainedRev, State1} =
lists:foldl(
fun(_Subscription, {[{error, _} = E | _] = L, R, S}) ->
%% Once a subscription failed, mark all following subscriptions
fun(_Subscription, {[{error, _} = E | _] = L, R, S}) when DisconnectOnUnauthorized =:= true ->
%% If disconnect_on_unauthorized is true,
%% once a subscription failed, mark all following subscriptions
%% as failed instead of creating bindings because we are going
%% to close the client connection anyway.
{[E | L], R, S};
Expand Down Expand Up @@ -509,6 +511,17 @@ process_request(?SUBSCRIBE,
reason_codes = lists:reverse(ReasonCodesRev)}},
_ = send(Reply, State1),
case hd(ResultRev) of
{error, access_refused} ->
%% If disconnect_on_unauthorized is false, do not disconnect the client,
%% send retained messages for the topics to which the client could successfully subscribe.
%% Otherwise, disconnect the client, treat the subscription failure.
case application:get_env(rabbitmq_mqtt, disconnect_on_unauthorized, true) of
false ->
State = send_retained_messages(lists:reverse(RetainedRev), State1),
{ok, State};
true ->
{error, subscribe_error, State1}
end;
{error, _} ->
{error, subscribe_error, State1};
_ ->
Expand Down Expand Up @@ -2266,7 +2279,41 @@ publish_to_queues_with_checks(
Error
end;
{error, access_refused} ->
{error, access_refused, State}
%% If disconnect_on_unauthorized is false,
%% MQTT v5 and QoS1 reply with PUBACK including an error reason code and keep connection,
%% MQTT v3 and QoS1 reply with PUBACK no error reason code and keep connection,
%% QoS0 drop silently and keep connection.
%% Otherwise, disconnect.
case application:get_env(rabbitmq_mqtt, disconnect_on_unauthorized, true) of
false ->
case Msg#mqtt_msg.qos of
?QOS_1 ->
case State#state.cfg#cfg.proto_ver of
?MQTT_PROTO_V5 ->
Reply = #mqtt_packet{
fixed = #mqtt_packet_fixed{type = ?PUBACK},
variable = #mqtt_packet_puback{
packet_id = Msg#mqtt_msg.packet_id,
reason_code = ?RC_NOT_AUTHORIZED
}
},
_ = send(Reply, State);
_ ->
Reply = #mqtt_packet{
fixed = #mqtt_packet_fixed{type = ?PUBACK},
variable = #mqtt_packet_puback{
packet_id = Msg#mqtt_msg.packet_id
}
},
_ = send(Reply, State)
end;
_ ->
ok
end,
{ok, State};
true ->
{error, access_refused, State}
end
end.

-spec check_publish_permitted(rabbit_exchange:name(), topic(), state()) ->
Expand Down
141 changes: 141 additions & 0 deletions deps/rabbitmq_mqtt/test/disconnect_on_unauthorized_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.


-module(disconnect_on_unauthorized_SUITE).

-compile([export_all,
nowarn_export_all]).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include("rabbit_mqtt.hrl").

-define(RC_Failure, 16#80).
-define(RC_NOT_AUTHORIZED, 16#87).
-define(RC_GRANTED_QOS_0, 16#0).

all() ->
[
{group, v4},
{group, v5}
].

groups() ->
[
{v4, [], test_cases()},
{v5, [], test_cases()}
].

test_cases() ->
[
publish_unauthorized_no_disconnect,
subscribe_unauthorized_no_disconnect
].

init_per_suite(Config0) ->
rabbit_ct_helpers:log_environment(),

User = <<"mqtt-user">>,
Password = <<"mqtt-password">>,

Env = [{rabbitmq_mqtt,
[{disconnect_on_unauthorized, false}]}
],
Config = rabbit_ct_helpers:merge_app_env(Config0, Env),

Config1 = rabbit_ct_helpers:run_setup_steps(
Config,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
util:enable_plugin(Config1, rabbitmq_mqtt),

rabbit_ct_broker_helpers:add_user(Config1, User, Password),

Config2 = rabbit_ct_helpers:set_config(Config1, [{mqtt_user, User}, {mqtt_pass, Password}]),

rabbit_ct_broker_helpers:set_permissions(Config2, User, <<"/">>, <<".*">>, <<"">>, <<".*">>),
ok = rabbit_ct_broker_helpers:rpc(Config2, 0,
rabbit_auth_backend_internal, set_topic_permissions,
[?config(mqtt_user, Config2), <<"/">>,
<<"amq.topic">>, <<"^topic1$">>, <<"^topic1$">>, <<"acting-user">>]),

Config2.

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(
Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(Group, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}),
Config1.

end_per_group(_Group, Config) ->
Config.

%%====================================================================
%% Test cases
%%====================================================================

publish_unauthorized_no_disconnect(Config) ->
C = util:connect(
<<"pub_client">>,
Config,
[{username, ?config(mqtt_user, Config)},
{password, ?config(mqtt_pass, Config)}]),

case ?config(mqtt_version, Config) of
v5 ->
{ok, #{reason_code := ?RC_NOT_AUTHORIZED}} =
emqtt:publish(
C,
<<"topic2">>,
<<"payload">>,
[{qos, 1}]
);
v4 ->
{ok, _} =
emqtt:publish(
C,
<<"topic2">>,
<<"payload">>,
[{qos, 1}]
)
end,

timer:sleep(300),
%% Client still connected
?assert(is_process_alive(C)),

ok = emqtt:disconnect(C).

subscribe_unauthorized_no_disconnect(Config) ->
C = util:connect(
<<"sub_client">>,
Config,
[{username, ?config(mqtt_user, Config)},
{password, ?config(mqtt_pass, Config)}]),

{ok, _, [ReasonCode]} =
emqtt:subscribe(
C,
{<<"topic2">>, qos0}
),

case ?config(mqtt_version, Config) of
v5 ->
?assertEqual(?RC_NOT_AUTHORIZED, ReasonCode);
v4 ->
?assertEqual(?RC_Failure, ReasonCode)
end,

timer:sleep(300),
%% Client still connected
?assert(is_process_alive(C)),

ok = emqtt:disconnect(C).
Loading