Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/amqp_client/src/amqp_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ is_connection_method(Method) ->
?PROTOCOL:lookup_class_name(ClassId) == connection.

server_misbehaved(#amqp_error{} = AmqpError, State = #state{number = Number}) ->
case rabbit_binary_generator:map_exception(Number, AmqpError, ?PROTOCOL) of
case rabbit_binary_generator:map_exception(Number, AmqpError) of
{0, _} ->
handle_shutdown({server_misbehaved, AmqpError}, State);
{_, Close} ->
Expand Down
8 changes: 6 additions & 2 deletions deps/amqp_client/src/amqp_channel_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ start_link(Type, Connection, ConnName, InfraArgs, ChNumber,

start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams],
ConnName, ChNumber, ChPid) ->
%% @todo We must use rabbit_direct:start_channel/10 for compatibility.
%% But the protocol argument is ignored on new nodes.
%% At some point it can be ignored as all nodes 4.3+ have
%% rabbit_direct:start_channel/9 as well.
case rpc:call(Node, rabbit_direct, start_channel,
[ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User,
VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams], ?DIRECT_OPERATION_TIMEOUT) of
Expand All @@ -64,7 +68,7 @@ start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams],
start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
GCThreshold = application:get_env(amqp_client, writer_gc_threshold, ?DEFAULT_GC_THRESHOLD),
StartMFA = {rabbit_writer, start_link,
[Sock, ChNumber, FrameMax, ?PROTOCOL, ChPid,
[Sock, ChNumber, FrameMax, ChPid,
{ConnName, ChNumber}, false, GCThreshold]},
ChildSpec = #{id => writer,
start => StartMFA,
Expand All @@ -75,7 +79,7 @@ start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
supervisor:start_child(Sup, ChildSpec).

init_command_assembler(direct) -> {ok, none};
init_command_assembler(network) -> rabbit_command_assembler:init(?PROTOCOL).
init_command_assembler(network) -> rabbit_command_assembler:init().

%%---------------------------------------------------------------------------
%% supervisor callbacks
Expand Down
4 changes: 2 additions & 2 deletions deps/amqp_client/src/amqp_connection_type_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ start_channels_manager(Sup, Conn, ConnName, Type) ->
start_infrastructure_fun(Sup, Conn, network) ->
fun (Sock, ConnName) ->
{ok, ChMgr} = start_channels_manager(Sup, Conn, ConnName, network),
{ok, AState} = rabbit_command_assembler:init(?PROTOCOL),
{ok, AState} = rabbit_command_assembler:init(),
{ok, GCThreshold} = application:get_env(amqp_client, writer_gc_threshold),

WriterStartMFA = {rabbit_writer, start_link,
[Sock, 0, ?FRAME_MIN_SIZE, ?PROTOCOL, Conn,
[Sock, 0, ?FRAME_MIN_SIZE, Conn,
ConnName, false, GCThreshold]},
WriterChildSpec = #{id => writer,
start => WriterStartMFA,
Expand Down
4 changes: 4 additions & 0 deletions deps/amqp_client/src/amqp_direct_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ connect(Params = #amqp_params_direct{username = Username,
connected_at =
os:system_time(milli_seconds)},
DecryptedPassword = credentials_obfuscation:decrypt(Password),
%% @todo We must use rabbit_direct:connect/5 for compatibility.
%% But the protocol argument is ignored on new nodes.
%% At some point it can be ignored as all nodes 4.3+ have
%% rabbit_direct:connect/4 as well.
case rpc:call(Node, rabbit_direct, connect,
[{Username, DecryptedPassword}, VHost, ?PROTOCOL, self(),
connection_info(State1)], ?DIRECT_OPERATION_TIMEOUT) of
Expand Down
2 changes: 1 addition & 1 deletion deps/amqp_client/src/amqp_gen_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ server_initiated_close(Close, State) ->
server_misbehaved_close(AmqpError, State) ->
?LOG_WARNING("Connection (~tp) closing: server misbehaved: ~tp",
[self(), AmqpError]),
{0, Close} = rabbit_binary_generator:map_exception(0, AmqpError, ?PROTOCOL),
{0, Close} = rabbit_binary_generator:map_exception(0, AmqpError),
set_closing_state(abrupt, #closing{reason = server_misbehaved,
close = Close}, State).

Expand Down
2 changes: 1 addition & 1 deletion deps/amqp_client/src/amqp_main_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ process_frame(Type, ChNumber, Payload,
State = #state{connection = Connection,
channels_manager = ChMgr,
astate = AState}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, ?PROTOCOL) of
case rabbit_command_assembler:analyze_frame(Type, Payload) of
heartbeat when ChNumber /= 0 ->
amqp_gen_connection:server_misbehaved(
Connection,
Expand Down
3 changes: 1 addition & 2 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
-define(AMQP10_APP_PROPERTIES_HEADER, <<"x-amqp-1.0-app-properties">>).
-define(AMQP10_MESSAGE_ANNOTATIONS_HEADER, <<"x-amqp-1.0-message-annotations">>).
-define(AMQP10_FOOTER, <<"x-amqp-1.0-footer">>).
-define(PROTOMOD, rabbit_framing_amqp_0_9_1).
-define(CLASS_ID, 60).

-opaque state() :: #content{}.
Expand Down Expand Up @@ -310,7 +309,7 @@ prepare(read, Content) ->
rabbit_binary_parser:ensure_content_decoded(Content);
prepare(store, Content) ->
rabbit_binary_parser:clear_decoded_content(
rabbit_binary_generator:ensure_content_encoded(Content, ?PROTOMOD)).
rabbit_binary_generator:ensure_content_encoded(Content)).

convert_to(?MODULE, Content, _Env) ->
Content;
Expand Down
30 changes: 12 additions & 18 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

-behaviour(gen_server2).

-export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([start_link/10, start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
emit_info_all/4, info_local/1]).
Expand Down Expand Up @@ -78,9 +78,6 @@
-record(conf, {
%% starting | running | flow | closing
state,
%% same as reader's protocol. Used when instantiating
%% (protocol) exceptions.
protocol,
%% channel number
channel,
%% reader process
Expand Down Expand Up @@ -243,26 +240,26 @@
}}).

-spec start_link
(channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(),
(channel_number(), pid(), pid(), pid(), string(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid(), pid()) ->
rabbit_types:ok_pid_or_error().

start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, User,
VHost, Capabilities, CollectorPid, Limiter) ->
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, User,
VHost, Capabilities, CollectorPid, Limiter, undefined).

-spec start_link
(channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(),
(channel_number(), pid(), pid(), pid(), string(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid(), pid(), any()) ->
rabbit_types:ok_pid_or_error().

start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, User,
VHost, Capabilities, CollectorPid, Limiter, AmqpParams) ->
gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName,
User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []).

-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
Expand Down Expand Up @@ -436,7 +433,7 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->

%%---------------------------------------------------------------------------

init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, User, VHost,
Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
process_flag(trap_exit, true),
rabbit_process_flag:adjust_for_message_handling_proc(),
Expand Down Expand Up @@ -471,7 +468,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
username => User#user.username,
connection_name => ConnName},
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
Expand Down Expand Up @@ -858,8 +854,7 @@ send(Command, #ch{cfg = #conf{writer_pid = WriterPid}}) ->
format_soft_error(#amqp_error{name = N, explanation = E, method = M}) ->
io_lib:format("operation ~ts caused a channel exception ~ts: ~ts", [M, N, E]).

handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol,
channel = Channel,
handle_exception(Reason, State = #ch{cfg = #conf{channel = Channel,
writer_pid = WriterPid,
reader_pid = ReaderPid,
conn_pid = ConnPid,
Expand All @@ -869,7 +864,7 @@ handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol,
}}) ->
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
case rabbit_binary_generator:map_exception(Channel, Reason) of
{Channel, CloseMethod} ->
?LOG_ERROR(
"Channel error on connection ~tp (~ts, vhost: '~ts',"
Expand Down Expand Up @@ -1838,10 +1833,9 @@ binding_action(Action, Binding, Username, ConnPid) ->
end.

basic_return(Content, RoutingKey, XNameBin,
#ch{cfg = #conf{protocol = Protocol,
writer_pid = WriterPid}},
#ch{cfg = #conf{writer_pid = WriterPid}},
Reason) ->
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
{_Close, ReplyCode, ReplyText} = rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(WriterPid,
#'basic.return'{reply_code = ReplyCode,
reply_text = ReplyText,
Expand Down
20 changes: 10 additions & 10 deletions deps/rabbit/src/rabbit_channel_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@

-type start_link_args() ::
{'tcp', rabbit_net:socket(), rabbit_channel:channel_number(),
non_neg_integer(), pid(), string(), rabbit_types:protocol(),
non_neg_integer(), pid(), string(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid()} |
{'direct', rabbit_channel:channel_number(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
rabbit_types:user(), rabbit_types:vhost(),
rabbit_framing:amqp_table(), pid()}.

-define(FAIR_WAIT, 70000).
Expand All @@ -44,16 +44,16 @@

-spec start_link(start_link_args()) -> {'ok', pid(), {pid(), any()}}.

start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, User,
VHost, Capabilities, Collector}) ->
{ok, SupPid} = supervisor:start_link(
?MODULE, {tcp, Sock, Channel, FrameMax,
ReaderPid, Protocol, {ConnName, Channel}}),
ReaderPid, {ConnName, Channel}}),
[LimiterPid] = rabbit_misc:find_child(SupPid, limiter),
[WriterPid] = rabbit_misc:find_child(SupPid, writer),
StartMFA = {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, ReaderPid, ConnName,
Protocol, User, VHost, Capabilities, Collector,
User, VHost, Capabilities, Collector,
LimiterPid]},
ChildSpec = #{id => channel,
start => StartMFA,
Expand All @@ -63,16 +63,16 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
type => worker,
modules => [rabbit_channel]},
{ok, ChannelPid} = supervisor:start_child(SupPid, ChildSpec),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, AState} = rabbit_command_assembler:init(),
{ok, SupPid, {ChannelPid, AState}};
start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName,
User, VHost, Capabilities, Collector, AmqpParams}) ->
{ok, SupPid} = supervisor:start_link(
?MODULE, {direct, {ConnName, Channel}}),
[LimiterPid] = rabbit_misc:find_child(SupPid, limiter),
StartMFA = {rabbit_channel, start_link,
[Channel, ClientChannelPid, ClientChannelPid, ConnPid,
ConnName, Protocol, User, VHost, Capabilities, Collector,
ConnName, User, VHost, Capabilities, Collector,
LimiterPid, AmqpParams]},
ChildSpec = #{id => channel,
start => StartMFA,
Expand All @@ -94,9 +94,9 @@ init(Type) ->
auto_shutdown => any_significant},
{ok, {SupFlags, child_specs(Type)}}.

child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) ->
child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Identity}) ->
StartMFA = {rabbit_writer, start_link,
[Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]},
[Sock, Channel, FrameMax, ReaderPid, Identity, true]},
[
#{
id => writer,
Expand Down
21 changes: 0 additions & 21 deletions deps/rabbit/src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
delete_tracked_connection_vhost_entry/1,
list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
lookup/1, lookup/2, count/0]).

-export([count_local_tracked_items_in_vhost/1,
Expand Down Expand Up @@ -382,26 +381,6 @@ tracked_connection_from_connection_created(EventDetails) ->
peer_host = pget(peer_host, EventDetails),
peer_port = pget(peer_port, EventDetails)}.

tracked_connection_from_connection_state(#connection{
vhost = VHost,
connected_at = Ts,
peer_host = PeerHost,
peer_port = PeerPort,
user = Username,
name = Name
}) ->
tracked_connection_from_connection_created(
[{name, Name},
{node, node()},
{vhost, VHost},
{user, Username},
{user_who_performed_action, Username},
{connected_at, Ts},
{pid, self()},
{type, network},
{peer_port, PeerPort},
{peer_host, PeerHost}]).

close_connections(Tracked, Message) ->
close_connections(Tracked, Message, 0).

Expand Down
42 changes: 34 additions & 8 deletions deps/rabbit/src/rabbit_direct.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

-module(rabbit_direct).

-export([boot/0, force_event_refresh/1, list/0, connect/5,
start_channel/10, disconnect/2]).
-export([boot/0, force_event_refresh/1, list/0, connect/4, connect/5,
start_channel/9, start_channel/10, disconnect/2]).

-deprecated([{force_event_refresh, 1, eventually}]).

Expand Down Expand Up @@ -71,6 +71,20 @@ auth_fun({Username, Password}, VHost, ExtraAuthProps) ->
'broker_not_found_on_node' |
{'auth_failure', string()} | 'access_refused').

%% Kept for compatibility with older amqp_client versions (rpc:call).
connect(Creds, VHost, _Protocol, Pid, Infos) ->
connect(Creds, VHost, Pid, Infos).

-spec connect
(({'none', 'none'} | {rabbit_types:username(), 'none'} |
{rabbit_types:username(), rabbit_types:password()}),
rabbit_types:vhost(), pid(),
rabbit_event:event_props()) ->
rabbit_types:ok_or_error2(
{rabbit_types:user(), rabbit_framing:amqp_table()},
'broker_not_found_on_node' |
{'auth_failure', string()} | 'access_refused').

%% Infos is a PropList which contains the content of the Proplist #amqp_adapter_info.additional_info
%% among other credentials such as protocol, ssl information, etc.
%% #amqp_adapter_info.additional_info may carry a credential called `authz_bakends` which has the
Expand All @@ -81,7 +95,7 @@ auth_fun({Username, Password}, VHost, ExtraAuthProps) ->
%% on a different context. In other words, we do not have anymore the initial credentials presented
%% during the first authentication. However, we do have the outcome from such successful authentication.

connect(Creds, VHost, Protocol, Pid, Infos) ->
connect(Creds, VHost, Pid, Infos) ->
ExtraAuthProps = get_authz_backends(Infos),

AuthFun = auth_fun(Creds, VHost, ExtraAuthProps),
Expand All @@ -103,7 +117,7 @@ connect(Creds, VHost, Protocol, Pid, Infos) ->
{ok, User = #user{username = Username}} ->
notify_auth_result(Username,
user_authentication_success, []),
connect1(User, VHost, Protocol, Pid, Infos);
connect1(User, VHost, Pid, Infos);
{refused, Username, Msg, Args} ->
notify_auth_result(Username,
user_authentication_failure,
Expand Down Expand Up @@ -163,7 +177,7 @@ notify_auth_result(Username, AuthResult, ExtraProps) ->
ExtraProps,
rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).

connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
connect1(User = #user{username = Username}, VHost, Pid, Infos) ->
case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
false ->
% Note: peer_host can be either a tuple or
Expand All @@ -177,7 +191,7 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
rabbit_event:notify(connection_created, Infos),
_ = rabbit_alarm:register(
Pid, {?MODULE, conserve_resources, []}),
{ok, {User, rabbit_reader:server_properties(Protocol)}}
{ok, {User, rabbit_reader:server_properties(rabbit_framing_amqp_0_9_1)}}
catch
exit:#amqp_error{name = Reason = not_allowed} ->
{error, Reason}
Expand All @@ -197,15 +211,27 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
rabbit_framing:amqp_table(), pid(), any()) ->
{'ok', pid()}.

start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol,
%% Kept for compatibility with older amqp_client versions (rpc:call).
start_channel(Number, ClientChannelPid, ConnPid, ConnName, _Protocol,
User, VHost, Capabilities, Collector, AmqpParams) ->
start_channel(Number, ClientChannelPid, ConnPid, ConnName,
User, VHost, Capabilities, Collector, AmqpParams).

-spec start_channel
(rabbit_channel:channel_number(), pid(), pid(), string(),
rabbit_types:user(), rabbit_types:vhost(),
rabbit_framing:amqp_table(), pid(), any()) ->
{'ok', pid()}.

start_channel(Number, ClientChannelPid, ConnPid, ConnName,
User = #user{username = Username}, VHost, Capabilities,
Collector, AmqpParams) ->
case rabbit_auth_backend_internal:is_over_channel_limit(Username) of
false ->
{ok, _, {ChannelPid, _}} =
supervisor:start_child(
rabbit_direct_client_sup,
[{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
[{direct, Number, ClientChannelPid, ConnPid, ConnName,
User, VHost, Capabilities, Collector, AmqpParams}]),
{ok, ChannelPid};
{true, Limit} ->
Expand Down
Loading
Loading