diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile index 2f42f7d4c453..e08f61eb61f6 100644 --- a/deps/rabbitmq_mqtt/Makefile +++ b/deps/rabbitmq_mqtt/Makefile @@ -6,6 +6,7 @@ define PROJECT_ENV [ {ssl_cert_login,false}, {allow_anonymous, true}, + {disconnect_on_unauthorized, true}, {vhost, <<"/">>}, {exchange, <<"amq.topic">>}, {max_session_expiry_interval_seconds, 86400}, %% 1 day diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema index 1be98c757edf..5de087fc7279 100644 --- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema +++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema @@ -15,6 +15,13 @@ {mapping, "mqtt.allow_anonymous", "rabbitmq_mqtt.allow_anonymous", [{datatype, {enum, [true, false]}}]}. +%% Whether disconnect when publish or subscribe non-authorized topic. +%% +%% {disconnect_on_unauthorized, true}, + +{mapping, "mqtt.disconnect_on_unauthorized", "rabbitmq_mqtt.disconnect_on_unauthorized", + [{datatype, {enum, [true, false]}}]}. + %% If you have multiple chosts, specify the one to which the %% adapter connects. %% diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 3481366c90a7..842dabf78da8 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -451,10 +451,12 @@ process_request(?SUBSCRIBE, State0 = #state{cfg = #cfg{proto_ver = ProtoVer, binding_args_v2 = BindingArgsV2}}) -> ?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]), + DisconnectOnUnauthorized = application:get_env(rabbitmq_mqtt, disconnect_on_unauthorized, true), {ResultRev, RetainedRev, State1} = lists:foldl( - fun(_Subscription, {[{error, _} = E | _] = L, R, S}) -> - %% Once a subscription failed, mark all following subscriptions + fun(_Subscription, {[{error, _} = E | _] = L, R, S}) when DisconnectOnUnauthorized =:= true -> + %% If disconnect_on_unauthorized is true, + %% once a subscription failed, mark all following subscriptions %% as failed instead of creating bindings because we are going %% to close the client connection anyway. {[E | L], R, S}; @@ -509,6 +511,17 @@ process_request(?SUBSCRIBE, reason_codes = lists:reverse(ReasonCodesRev)}}, _ = send(Reply, State1), case hd(ResultRev) of + {error, access_refused} -> + %% If disconnect_on_unauthorized is false, do not disconnect the client, + %% send retained messages for the topics to which the client could successfully subscribe. + %% Otherwise, disconnect the client, treat the subscription failure. + case application:get_env(rabbitmq_mqtt, disconnect_on_unauthorized, true) of + false -> + State = send_retained_messages(lists:reverse(RetainedRev), State1), + {ok, State}; + true -> + {error, subscribe_error, State1} + end; {error, _} -> {error, subscribe_error, State1}; _ -> @@ -2266,7 +2279,41 @@ publish_to_queues_with_checks( Error end; {error, access_refused} -> - {error, access_refused, State} + %% If disconnect_on_unauthorized is false, + %% MQTT v5 and QoS1 reply with PUBACK including an error reason code and keep connection, + %% MQTT v3 and QoS1 reply with PUBACK no error reason code and keep connection, + %% QoS0 drop silently and keep connection. + %% Otherwise, disconnect. + case application:get_env(rabbitmq_mqtt, disconnect_on_unauthorized, true) of + false -> + case Msg#mqtt_msg.qos of + ?QOS_1 -> + case State#state.cfg#cfg.proto_ver of + ?MQTT_PROTO_V5 -> + Reply = #mqtt_packet{ + fixed = #mqtt_packet_fixed{type = ?PUBACK}, + variable = #mqtt_packet_puback{ + packet_id = Msg#mqtt_msg.packet_id, + reason_code = ?RC_NOT_AUTHORIZED + } + }, + _ = send(Reply, State); + _ -> + Reply = #mqtt_packet{ + fixed = #mqtt_packet_fixed{type = ?PUBACK}, + variable = #mqtt_packet_puback{ + packet_id = Msg#mqtt_msg.packet_id + } + }, + _ = send(Reply, State) + end; + _ -> + ok + end, + {ok, State}; + true -> + {error, access_refused, State} + end end. -spec check_publish_permitted(rabbit_exchange:name(), topic(), state()) -> diff --git a/deps/rabbitmq_mqtt/test/disconnect_on_unauthorized_SUITE.erl b/deps/rabbitmq_mqtt/test/disconnect_on_unauthorized_SUITE.erl new file mode 100644 index 000000000000..dc7cd0ba3528 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/disconnect_on_unauthorized_SUITE.erl @@ -0,0 +1,141 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + + +-module(disconnect_on_unauthorized_SUITE). + +-compile([export_all, + nowarn_export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("rabbit_mqtt.hrl"). + +-define(RC_Failure, 16#80). +-define(RC_NOT_AUTHORIZED, 16#87). +-define(RC_GRANTED_QOS_0, 16#0). + +all() -> + [ + {group, v4}, + {group, v5} + ]. + +groups() -> + [ + {v4, [], test_cases()}, + {v5, [], test_cases()} + ]. + +test_cases() -> + [ + publish_unauthorized_no_disconnect, + subscribe_unauthorized_no_disconnect + ]. + +init_per_suite(Config0) -> + rabbit_ct_helpers:log_environment(), + + User = <<"mqtt-user">>, + Password = <<"mqtt-password">>, + + Env = [{rabbitmq_mqtt, + [{disconnect_on_unauthorized, false}]} + ], + Config = rabbit_ct_helpers:merge_app_env(Config0, Env), + + Config1 = rabbit_ct_helpers:run_setup_steps( + Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + util:enable_plugin(Config1, rabbitmq_mqtt), + + rabbit_ct_broker_helpers:add_user(Config1, User, Password), + + Config2 = rabbit_ct_helpers:set_config(Config1, [{mqtt_user, User}, {mqtt_pass, Password}]), + + rabbit_ct_broker_helpers:set_permissions(Config2, User, <<"/">>, <<".*">>, <<"">>, <<".*">>), + ok = rabbit_ct_broker_helpers:rpc(Config2, 0, + rabbit_auth_backend_internal, set_topic_permissions, + [?config(mqtt_user, Config2), <<"/">>, + <<"amq.topic">>, <<"^topic1$">>, <<"^topic1$">>, <<"acting-user">>]), + + Config2. + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}), + Config1. + +end_per_group(_Group, Config) -> + Config. + +%%==================================================================== +%% Test cases +%%==================================================================== + +publish_unauthorized_no_disconnect(Config) -> + C = util:connect( + <<"pub_client">>, + Config, + [{username, ?config(mqtt_user, Config)}, + {password, ?config(mqtt_pass, Config)}]), + + case ?config(mqtt_version, Config) of + v5 -> + {ok, #{reason_code := ?RC_NOT_AUTHORIZED}} = + emqtt:publish( + C, + <<"topic2">>, + <<"payload">>, + [{qos, 1}] + ); + v4 -> + {ok, _} = + emqtt:publish( + C, + <<"topic2">>, + <<"payload">>, + [{qos, 1}] + ) + end, + + timer:sleep(300), + %% Client still connected + ?assert(is_process_alive(C)), + + ok = emqtt:disconnect(C). + +subscribe_unauthorized_no_disconnect(Config) -> + C = util:connect( + <<"sub_client">>, + Config, + [{username, ?config(mqtt_user, Config)}, + {password, ?config(mqtt_pass, Config)}]), + + {ok, _, [ReasonCode]} = + emqtt:subscribe( + C, + {<<"topic2">>, qos0} + ), + + case ?config(mqtt_version, Config) of + v5 -> + ?assertEqual(?RC_NOT_AUTHORIZED, ReasonCode); + v4 -> + ?assertEqual(?RC_Failure, ReasonCode) + end, + + timer:sleep(300), + %% Client still connected + ?assert(is_process_alive(C)), + + ok = emqtt:disconnect(C).