diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl index 25203c73c7ca..431d145c4944 100644 --- a/deps/amqp_client/src/amqp_channel.erl +++ b/deps/amqp_client/src/amqp_channel.erl @@ -927,7 +927,7 @@ is_connection_method(Method) -> ?PROTOCOL:lookup_class_name(ClassId) == connection. server_misbehaved(#amqp_error{} = AmqpError, State = #state{number = Number}) -> - case rabbit_binary_generator:map_exception(Number, AmqpError, ?PROTOCOL) of + case rabbit_binary_generator:map_exception(Number, AmqpError) of {0, _} -> handle_shutdown({server_misbehaved, AmqpError}, State); {_, Close} -> diff --git a/deps/amqp_client/src/amqp_channel_sup.erl b/deps/amqp_client/src/amqp_channel_sup.erl index cd9d78ba3da0..697f0060b909 100644 --- a/deps/amqp_client/src/amqp_channel_sup.erl +++ b/deps/amqp_client/src/amqp_channel_sup.erl @@ -51,6 +51,10 @@ start_link(Type, Connection, ConnName, InfraArgs, ChNumber, start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams], ConnName, ChNumber, ChPid) -> + %% @todo We must use rabbit_direct:start_channel/10 for compatibility. + %% But the protocol argument is ignored on new nodes. + %% At some point it can be ignored as all nodes 4.3+ have + %% rabbit_direct:start_channel/9 as well. case rpc:call(Node, rabbit_direct, start_channel, [ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User, VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams], ?DIRECT_OPERATION_TIMEOUT) of @@ -64,7 +68,7 @@ start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams], start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) -> GCThreshold = application:get_env(amqp_client, writer_gc_threshold, ?DEFAULT_GC_THRESHOLD), StartMFA = {rabbit_writer, start_link, - [Sock, ChNumber, FrameMax, ?PROTOCOL, ChPid, + [Sock, ChNumber, FrameMax, ChPid, {ConnName, ChNumber}, false, GCThreshold]}, ChildSpec = #{id => writer, start => StartMFA, @@ -75,7 +79,7 @@ start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) -> supervisor:start_child(Sup, ChildSpec). init_command_assembler(direct) -> {ok, none}; -init_command_assembler(network) -> rabbit_command_assembler:init(?PROTOCOL). +init_command_assembler(network) -> rabbit_command_assembler:init(). %%--------------------------------------------------------------------------- %% supervisor callbacks diff --git a/deps/amqp_client/src/amqp_connection_type_sup.erl b/deps/amqp_client/src/amqp_connection_type_sup.erl index fa46a77cd2bb..2120ee181b2a 100644 --- a/deps/amqp_client/src/amqp_connection_type_sup.erl +++ b/deps/amqp_client/src/amqp_connection_type_sup.erl @@ -51,11 +51,11 @@ start_channels_manager(Sup, Conn, ConnName, Type) -> start_infrastructure_fun(Sup, Conn, network) -> fun (Sock, ConnName) -> {ok, ChMgr} = start_channels_manager(Sup, Conn, ConnName, network), - {ok, AState} = rabbit_command_assembler:init(?PROTOCOL), + {ok, AState} = rabbit_command_assembler:init(), {ok, GCThreshold} = application:get_env(amqp_client, writer_gc_threshold), WriterStartMFA = {rabbit_writer, start_link, - [Sock, 0, ?FRAME_MIN_SIZE, ?PROTOCOL, Conn, + [Sock, 0, ?FRAME_MIN_SIZE, Conn, ConnName, false, GCThreshold]}, WriterChildSpec = #{id => writer, start => WriterStartMFA, diff --git a/deps/amqp_client/src/amqp_direct_connection.erl b/deps/amqp_client/src/amqp_direct_connection.erl index 4143599b230e..59b44984d2ea 100644 --- a/deps/amqp_client/src/amqp_direct_connection.erl +++ b/deps/amqp_client/src/amqp_direct_connection.erl @@ -144,6 +144,10 @@ connect(Params = #amqp_params_direct{username = Username, connected_at = os:system_time(milli_seconds)}, DecryptedPassword = credentials_obfuscation:decrypt(Password), + %% @todo We must use rabbit_direct:connect/5 for compatibility. + %% But the protocol argument is ignored on new nodes. + %% At some point it can be ignored as all nodes 4.3+ have + %% rabbit_direct:connect/4 as well. case rpc:call(Node, rabbit_direct, connect, [{Username, DecryptedPassword}, VHost, ?PROTOCOL, self(), connection_info(State1)], ?DIRECT_OPERATION_TIMEOUT) of diff --git a/deps/amqp_client/src/amqp_gen_connection.erl b/deps/amqp_client/src/amqp_gen_connection.erl index 886a06d45f05..833d4311efb6 100644 --- a/deps/amqp_client/src/amqp_gen_connection.erl +++ b/deps/amqp_client/src/amqp_gen_connection.erl @@ -353,7 +353,7 @@ server_initiated_close(Close, State) -> server_misbehaved_close(AmqpError, State) -> ?LOG_WARNING("Connection (~tp) closing: server misbehaved: ~tp", [self(), AmqpError]), - {0, Close} = rabbit_binary_generator:map_exception(0, AmqpError, ?PROTOCOL), + {0, Close} = rabbit_binary_generator:map_exception(0, AmqpError), set_closing_state(abrupt, #closing{reason = server_misbehaved, close = Close}, State). diff --git a/deps/amqp_client/src/amqp_main_reader.erl b/deps/amqp_client/src/amqp_main_reader.erl index 8aea83c82a12..84899ed153d0 100644 --- a/deps/amqp_client/src/amqp_main_reader.erl +++ b/deps/amqp_client/src/amqp_main_reader.erl @@ -147,7 +147,7 @@ process_frame(Type, ChNumber, Payload, State = #state{connection = Connection, channels_manager = ChMgr, astate = AState}) -> - case rabbit_command_assembler:analyze_frame(Type, Payload, ?PROTOCOL) of + case rabbit_command_assembler:analyze_frame(Type, Payload) of heartbeat when ChNumber /= 0 -> amqp_gen_connection:server_misbehaved( Connection, diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 2106b89b8c95..50e817ba8e60 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -41,7 +41,6 @@ -define(AMQP10_APP_PROPERTIES_HEADER, <<"x-amqp-1.0-app-properties">>). -define(AMQP10_MESSAGE_ANNOTATIONS_HEADER, <<"x-amqp-1.0-message-annotations">>). -define(AMQP10_FOOTER, <<"x-amqp-1.0-footer">>). --define(PROTOMOD, rabbit_framing_amqp_0_9_1). -define(CLASS_ID, 60). -opaque state() :: #content{}. @@ -310,7 +309,7 @@ prepare(read, Content) -> rabbit_binary_parser:ensure_content_decoded(Content); prepare(store, Content) -> rabbit_binary_parser:clear_decoded_content( - rabbit_binary_generator:ensure_content_encoded(Content, ?PROTOMOD)). + rabbit_binary_generator:ensure_content_encoded(Content)). convert_to(?MODULE, Content, _Env) -> Content; diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 19175cbd65b3..8c1757926e60 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -48,7 +48,7 @@ -behaviour(gen_server2). --export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]). +-export([start_link/10, start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1, emit_info_all/4, info_local/1]). @@ -78,9 +78,6 @@ -record(conf, { %% starting | running | flow | closing state, - %% same as reader's protocol. Used when instantiating - %% (protocol) exceptions. - protocol, %% channel number channel, %% reader process @@ -243,26 +240,26 @@ }}). -spec start_link - (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), + (channel_number(), pid(), pid(), pid(), string(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(), pid()) -> rabbit_types:ok_pid_or_error(). -start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, +start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, User, VHost, Capabilities, CollectorPid, Limiter) -> - start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, + start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, User, VHost, Capabilities, CollectorPid, Limiter, undefined). -spec start_link - (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), + (channel_number(), pid(), pid(), pid(), string(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(), pid(), any()) -> rabbit_types:ok_pid_or_error(). -start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, +start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams) -> gen_server2:start_link( - ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, + ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []). -spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. @@ -436,7 +433,7 @@ update_user_state(Pid, UserState) when is_pid(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, +init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, User, VHost, Capabilities, CollectorPid, LimiterPid, AmqpParams]) -> process_flag(trap_exit, true), rabbit_process_flag:adjust_for_message_handling_proc(), @@ -471,7 +468,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, username => User#user.username, connection_name => ConnName}, State = #ch{cfg = #conf{state = starting, - protocol = Protocol, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, @@ -858,8 +854,7 @@ send(Command, #ch{cfg = #conf{writer_pid = WriterPid}}) -> format_soft_error(#amqp_error{name = N, explanation = E, method = M}) -> io_lib:format("operation ~ts caused a channel exception ~ts: ~ts", [M, N, E]). -handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol, - channel = Channel, +handle_exception(Reason, State = #ch{cfg = #conf{channel = Channel, writer_pid = WriterPid, reader_pid = ReaderPid, conn_pid = ConnPid, @@ -869,7 +864,7 @@ handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol, }}) -> %% something bad's happened: notify_queues may not be 'ok' {_Result, State1} = notify_queues(State), - case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of + case rabbit_binary_generator:map_exception(Channel, Reason) of {Channel, CloseMethod} -> ?LOG_ERROR( "Channel error on connection ~tp (~ts, vhost: '~ts'," @@ -1838,10 +1833,9 @@ binding_action(Action, Binding, Username, ConnPid) -> end. basic_return(Content, RoutingKey, XNameBin, - #ch{cfg = #conf{protocol = Protocol, - writer_pid = WriterPid}}, + #ch{cfg = #conf{writer_pid = WriterPid}}, Reason) -> - {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), + {_Close, ReplyCode, ReplyText} = rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command(WriterPid, #'basic.return'{reply_code = ReplyCode, reply_text = ReplyText, diff --git a/deps/rabbit/src/rabbit_channel_sup.erl b/deps/rabbit/src/rabbit_channel_sup.erl index 11a80732adae..bf4b4eeaafcf 100644 --- a/deps/rabbit/src/rabbit_channel_sup.erl +++ b/deps/rabbit/src/rabbit_channel_sup.erl @@ -31,11 +31,11 @@ -type start_link_args() :: {'tcp', rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), pid(), string(), rabbit_types:protocol(), + non_neg_integer(), pid(), string(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()} | {'direct', rabbit_channel:channel_number(), pid(), string(), - rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()}. -define(FAIR_WAIT, 70000). @@ -44,16 +44,16 @@ -spec start_link(start_link_args()) -> {'ok', pid(), {pid(), any()}}. -start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, +start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, User, VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor:start_link( ?MODULE, {tcp, Sock, Channel, FrameMax, - ReaderPid, Protocol, {ConnName, Channel}}), + ReaderPid, {ConnName, Channel}}), [LimiterPid] = rabbit_misc:find_child(SupPid, limiter), [WriterPid] = rabbit_misc:find_child(SupPid, writer), StartMFA = {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, ReaderPid, ConnName, - Protocol, User, VHost, Capabilities, Collector, + User, VHost, Capabilities, Collector, LimiterPid]}, ChildSpec = #{id => channel, start => StartMFA, @@ -63,16 +63,16 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, type => worker, modules => [rabbit_channel]}, {ok, ChannelPid} = supervisor:start_child(SupPid, ChildSpec), - {ok, AState} = rabbit_command_assembler:init(Protocol), + {ok, AState} = rabbit_command_assembler:init(), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, +start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, User, VHost, Capabilities, Collector, AmqpParams}) -> {ok, SupPid} = supervisor:start_link( ?MODULE, {direct, {ConnName, Channel}}), [LimiterPid] = rabbit_misc:find_child(SupPid, limiter), StartMFA = {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, - ConnName, Protocol, User, VHost, Capabilities, Collector, + ConnName, User, VHost, Capabilities, Collector, LimiterPid, AmqpParams]}, ChildSpec = #{id => channel, start => StartMFA, @@ -94,9 +94,9 @@ init(Type) -> auto_shutdown => any_significant}, {ok, {SupFlags, child_specs(Type)}}. -child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) -> +child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Identity}) -> StartMFA = {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]}, + [Sock, Channel, FrameMax, ReaderPid, Identity, true]}, [ #{ id => writer, diff --git a/deps/rabbit/src/rabbit_connection_tracking.erl b/deps/rabbit/src/rabbit_connection_tracking.erl index 74ae63ece812..88419a657c87 100644 --- a/deps/rabbit/src/rabbit_connection_tracking.erl +++ b/deps/rabbit/src/rabbit_connection_tracking.erl @@ -34,7 +34,6 @@ delete_tracked_connection_vhost_entry/1, list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1, tracked_connection_from_connection_created/1, - tracked_connection_from_connection_state/1, lookup/1, lookup/2, count/0]). -export([count_local_tracked_items_in_vhost/1, @@ -382,26 +381,6 @@ tracked_connection_from_connection_created(EventDetails) -> peer_host = pget(peer_host, EventDetails), peer_port = pget(peer_port, EventDetails)}. -tracked_connection_from_connection_state(#connection{ - vhost = VHost, - connected_at = Ts, - peer_host = PeerHost, - peer_port = PeerPort, - user = Username, - name = Name - }) -> - tracked_connection_from_connection_created( - [{name, Name}, - {node, node()}, - {vhost, VHost}, - {user, Username}, - {user_who_performed_action, Username}, - {connected_at, Ts}, - {pid, self()}, - {type, network}, - {peer_port, PeerPort}, - {peer_host, PeerHost}]). - close_connections(Tracked, Message) -> close_connections(Tracked, Message, 0). diff --git a/deps/rabbit/src/rabbit_direct.erl b/deps/rabbit/src/rabbit_direct.erl index 698641b5c437..557a5d76ca5a 100644 --- a/deps/rabbit/src/rabbit_direct.erl +++ b/deps/rabbit/src/rabbit_direct.erl @@ -7,8 +7,8 @@ -module(rabbit_direct). --export([boot/0, force_event_refresh/1, list/0, connect/5, - start_channel/10, disconnect/2]). +-export([boot/0, force_event_refresh/1, list/0, connect/4, connect/5, + start_channel/9, start_channel/10, disconnect/2]). -deprecated([{force_event_refresh, 1, eventually}]). @@ -71,6 +71,20 @@ auth_fun({Username, Password}, VHost, ExtraAuthProps) -> 'broker_not_found_on_node' | {'auth_failure', string()} | 'access_refused'). +%% Kept for compatibility with older amqp_client versions (rpc:call). +connect(Creds, VHost, _Protocol, Pid, Infos) -> + connect(Creds, VHost, Pid, Infos). + +-spec connect + (({'none', 'none'} | {rabbit_types:username(), 'none'} | + {rabbit_types:username(), rabbit_types:password()}), + rabbit_types:vhost(), pid(), + rabbit_event:event_props()) -> + rabbit_types:ok_or_error2( + {rabbit_types:user(), rabbit_framing:amqp_table()}, + 'broker_not_found_on_node' | + {'auth_failure', string()} | 'access_refused'). + %% Infos is a PropList which contains the content of the Proplist #amqp_adapter_info.additional_info %% among other credentials such as protocol, ssl information, etc. %% #amqp_adapter_info.additional_info may carry a credential called `authz_bakends` which has the @@ -81,7 +95,7 @@ auth_fun({Username, Password}, VHost, ExtraAuthProps) -> %% on a different context. In other words, we do not have anymore the initial credentials presented %% during the first authentication. However, we do have the outcome from such successful authentication. -connect(Creds, VHost, Protocol, Pid, Infos) -> +connect(Creds, VHost, Pid, Infos) -> ExtraAuthProps = get_authz_backends(Infos), AuthFun = auth_fun(Creds, VHost, ExtraAuthProps), @@ -103,7 +117,7 @@ connect(Creds, VHost, Protocol, Pid, Infos) -> {ok, User = #user{username = Username}} -> notify_auth_result(Username, user_authentication_success, []), - connect1(User, VHost, Protocol, Pid, Infos); + connect1(User, VHost, Pid, Infos); {refused, Username, Msg, Args} -> notify_auth_result(Username, user_authentication_failure, @@ -163,7 +177,7 @@ notify_auth_result(Username, AuthResult, ExtraProps) -> ExtraProps, rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). -connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) -> +connect1(User = #user{username = Username}, VHost, Pid, Infos) -> case rabbit_auth_backend_internal:is_over_connection_limit(Username) of false -> % Note: peer_host can be either a tuple or @@ -177,7 +191,7 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) -> rabbit_event:notify(connection_created, Infos), _ = rabbit_alarm:register( Pid, {?MODULE, conserve_resources, []}), - {ok, {User, rabbit_reader:server_properties(Protocol)}} + {ok, {User, rabbit_reader:server_properties(rabbit_framing_amqp_0_9_1)}} catch exit:#amqp_error{name = Reason = not_allowed} -> {error, Reason} @@ -197,7 +211,19 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) -> rabbit_framing:amqp_table(), pid(), any()) -> {'ok', pid()}. -start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, +%% Kept for compatibility with older amqp_client versions (rpc:call). +start_channel(Number, ClientChannelPid, ConnPid, ConnName, _Protocol, + User, VHost, Capabilities, Collector, AmqpParams) -> + start_channel(Number, ClientChannelPid, ConnPid, ConnName, + User, VHost, Capabilities, Collector, AmqpParams). + +-spec start_channel + (rabbit_channel:channel_number(), pid(), pid(), string(), + rabbit_types:user(), rabbit_types:vhost(), + rabbit_framing:amqp_table(), pid(), any()) -> + {'ok', pid()}. + +start_channel(Number, ClientChannelPid, ConnPid, ConnName, User = #user{username = Username}, VHost, Capabilities, Collector, AmqpParams) -> case rabbit_auth_backend_internal:is_over_channel_limit(Username) of @@ -205,7 +231,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, {ok, _, {ChannelPid, _}} = supervisor:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, + [{direct, Number, ClientChannelPid, ConnPid, ConnName, User, VHost, Capabilities, Collector, AmqpParams}]), {ok, ChannelPid}; {true, Limit} -> diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index fe3e74c7b92c..ba413bd78a4e 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -313,7 +313,6 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> peer_host = PeerHost, port = Port, peer_port = PeerPort, - protocol = none, user = none, timeout_sec = (HandshakeTimeout / 1000), frame_max = InitialFrameMax, @@ -700,24 +699,21 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -send_blocked(#v1{connection = #connection{protocol = Protocol, - capabilities = Capabilities}, +send_blocked(#v1{connection = #connection{capabilities = Capabilities}, sock = Sock}, Reason) -> case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of {bool, true} -> - ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, - Protocol); + ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}); _ -> ok end. -send_unblocked(#v1{connection = #connection{protocol = Protocol, - capabilities = Capabilities}, +send_unblocked(#v1{connection = #connection{capabilities = Capabilities}, sock = Sock}) -> case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of {bool, true} -> - ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol); + ok = send_on_channel0(Sock, #'connection.unblocked'{}); _ -> ok end. @@ -813,10 +809,9 @@ wait_for_channel_termination(N, TimerRef, maybe_close(State = #v1{connection_state = closing, channel_count = 0, - connection = #connection{protocol = Protocol}, sock = Sock}) -> NewState = close_connection(State), - ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), + ok = send_on_channel0(Sock, #'connection.close_ok'{}), NewState; maybe_close(State) -> State. @@ -845,14 +840,12 @@ log_hard_error(#v1{connection_state = CS, handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> log_hard_error(State, Channel, Reason), State; -handle_exception(State = #v1{connection = #connection{protocol = Protocol}, - connection_state = CS}, +handle_exception(State = #v1{connection_state = CS}, Channel, Reason) when ?IS_RUNNING(State) orelse CS =:= closing -> - respond_and_close(State, Channel, Protocol, Reason, Reason); + respond_and_close(State, Channel, Reason, Reason); %% authentication failure -handle_exception(State = #v1{connection = #connection{protocol = Protocol, - log_name = ConnName, +handle_exception(State = #v1{connection = #connection{log_name = ConnName, capabilities = Capabilities}, connection_state = starting}, Channel, Reason = #amqp_error{name = access_refused, @@ -864,14 +857,13 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol, case rabbit_misc:table_lookup(Capabilities, <<"authentication_failure_close">>) of {bool, true} -> - send_error_on_channel0_and_close(Channel, Protocol, Reason, State); + send_error_on_channel0_and_close(Channel, Reason, State); _ -> close_connection(terminate_channels(State)) end; %% when loopback-only user tries to connect from a non-local host %% when user tries to access a vhost it has no permissions for -handle_exception(State = #v1{connection = #connection{protocol = Protocol, - log_name = ConnName, +handle_exception(State = #v1{connection = #connection{log_name = ConnName, user = User}, connection_state = opening}, Channel, Reason = #amqp_error{name = not_allowed, @@ -879,16 +871,14 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol, ?LOG_ERROR( "Error on AMQP connection ~tp (~ts, user: '~ts', state: ~tp):~n~ts", [self(), ConnName, User#user.username, opening, ErrMsg]), - send_error_on_channel0_and_close(Channel, Protocol, Reason, State); -handle_exception(State = #v1{connection = #connection{protocol = Protocol}, - connection_state = CS = opening}, + send_error_on_channel0_and_close(Channel, Reason, State); +handle_exception(State = #v1{connection_state = CS = opening}, Channel, Reason = #amqp_error{}) -> - respond_and_close(State, Channel, Protocol, Reason, + respond_and_close(State, Channel, Reason, {handshake_error, CS, Reason}); %% when negotiation fails, e.g. due to channel_max being higher than the %% maximum allowed limit -handle_exception(State = #v1{connection = #connection{protocol = Protocol, - log_name = ConnName, +handle_exception(State = #v1{connection = #connection{log_name = ConnName, user = User}, connection_state = tuning}, Channel, Reason = #amqp_error{name = not_allowed, @@ -897,7 +887,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol, "Error on AMQP connection ~tp (~ts," " user: '~ts', state: ~tp):~n~ts", [self(), ConnName, User#user.username, tuning, ErrMsg]), - send_error_on_channel0_and_close(Channel, Protocol, Reason, State); + send_error_on_channel0_and_close(Channel, Reason, State); handle_exception(State, _Channel, Reason) -> %% We don't trust the client at this point - force them to wait %% for a bit so they can't DOS us with repeated failed logins etc. @@ -949,7 +939,6 @@ create_channel(Channel, channel_count = ChannelCount, connection = #connection{name = Name, - protocol = Protocol, frame_max = FrameMax, vhost = VHost, capabilities = Capabilities, @@ -960,7 +949,7 @@ create_channel(Channel, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, - Protocol, User, VHost, Capabilities, + User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), @@ -1028,16 +1017,16 @@ clean_up_all_channels(State) -> %%-------------------------------------------------------------------------- handle_frame(Type, 0, Payload, - State = #v1{connection = #connection{protocol = Protocol}}) + State = #v1{connection = #connection{}}) when ?IS_STOPPING(State) -> - case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State end; handle_frame(Type, 0, Payload, - State = #v1{connection = #connection{protocol = Protocol}}) -> - case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of + State = #v1{connection = #connection{}}) -> + case rabbit_command_assembler:analyze_frame(Type, Payload) of error -> frame_error(unknown_frame, Type, 0, Payload, State); heartbeat -> State; {method, MethodName, FieldsBin} -> @@ -1045,9 +1034,9 @@ handle_frame(Type, 0, Payload, _Other -> unexpected_frame(Type, 0, Payload, State) end; handle_frame(Type, Channel, Payload, - State = #v1{connection = #connection{protocol = Protocol}}) + State = #v1{connection = #connection{}}) when ?IS_RUNNING(State) -> - case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload) of error -> frame_error(unknown_frame, Type, Channel, Payload, State); heartbeat -> unexpected_frame(Type, Channel, Payload, State); Frame -> process_frame(Frame, Channel, State) @@ -1150,10 +1139,10 @@ version_negotiation({ProtocolId, 1, 0, 0}, #v1{sock = Sock}) -> %% AMQP 1.0 figure 2.13: We require SASL security layer. refuse_connection(Sock, {sasl_required, ProtocolId}); version_negotiation({0, 0, 9, 1}, State) -> - start_091_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); + start_091_connection({0, 9, 1}, State); version_negotiation({1, 1, 0, 9}, State) -> %% This is the protocol header for 0-9, which we can safely treat as though it were 0-9-1. - start_091_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); + start_091_connection({0, 9, 0}, State); version_negotiation(Vsn = {0, 0, Minor, _}, #v1{sock = Sock}) when Minor >= 9 -> refuse_connection(Sock, {bad_version, Vsn}, {0, 0, 9, 1}); @@ -1164,7 +1153,6 @@ version_negotiation(Vsn, #v1{sock = Sock}) -> %% includes a major and minor version number, Luckily 0-9 and 0-9-1 %% are similar enough that clients will be happy with either. start_091_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, - Protocol, #v1{parent = Parent, sock = Sock, helper_sup = {HelperSup091, _HelperSup10}, @@ -1174,13 +1162,11 @@ start_091_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Start = #'connection.start'{ version_major = ProtocolMajor, version_minor = ProtocolMinor, - server_properties = server_properties(Protocol), + server_properties = server_properties(rabbit_framing_amqp_0_9_1), mechanisms = auth_mechanisms_binary(Sock), locales = <<"en_US">> }, - ok = send_on_channel0(Sock, Start, Protocol), - State = State0#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT, - protocol = Protocol}, + ok = send_on_channel0(Sock, Start), + State = State0#v1{connection = Connection#connection{timeout_sec = ?NORMAL_TIMEOUT}, connection_state = starting, helper_sup = HelperSup091}, switch_callback(State, frame_header, 7). @@ -1202,9 +1188,9 @@ ensure_stats_timer(State) -> %%-------------------------------------------------------------------------- handle_method0(MethodName, FieldsBin, - State = #v1{connection = #connection{protocol = Protocol}}) -> + State = #v1{connection = #connection{}}) -> try - handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), + handle_method0(rabbit_framing_amqp_0_9_1:decode_method_fields(MethodName, FieldsBin), State) catch throw:{inet_error, E} when E =:= closed; E =:= enotconn -> maybe_emit_stats(State), @@ -1296,8 +1282,7 @@ handle_method0(#'connection.open'{virtual_host = VHost}, connection_state = opening, connection = Connection = #connection{ log_name = ConnName, - user = User = #user{username = Username}, - protocol = Protocol}, + user = User = #user{username = Username}}, helper_sup = SupPid, sock = Sock, throttle = Throttle}) -> @@ -1307,7 +1292,7 @@ handle_method0(#'connection.open'{virtual_host = VHost}, ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}, #{}), ok = is_vhost_alive(VHost, User), NewConnection = Connection#connection{vhost = VHost}, - ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), + ok = send_on_channel0(Sock, #'connection.open_ok'{}), Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms], [{version, 2}]), @@ -1336,12 +1321,11 @@ handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, - State = #v1{connection = #connection{protocol = Protocol}, - sock = Sock}) + State = #v1{sock = Sock}) when ?IS_STOPPING(State) -> %% We're already closed or closing, so we don't need to cleanup %% anything. - ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), + ok = send_on_channel0(Sock, #'connection.close_ok'{}), State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> @@ -1349,8 +1333,7 @@ handle_method0(#'connection.close_ok'{}, State; handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reason}, State = #v1{connection = - #connection{protocol = Protocol, - user = User = #user{username = Username}, + #connection{user = User = #user{username = Username}, log_name = ConnName} = Conn, sock = Sock}) when ?IS_RUNNING(State) -> ?LOG_DEBUG( @@ -1369,7 +1352,7 @@ handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reas ?LOG_DEBUG("Updating user/auth backend state for channel ~tp", [Ch]), _ = rabbit_channel:update_user_state(Ch, User1) end, all_channels()), - ok = send_on_channel0(Sock, #'connection.update_secret_ok'{}, Protocol), + ok = send_on_channel0(Sock, #'connection.update_secret_ok'{}), ?LOG_INFO( "connection ~ts: user '~ts' updated secret, reason: ~ts", [dynamic_connection_name(ConnName), Username, Reason]), @@ -1471,8 +1454,8 @@ get_env(Key) -> {ok, Value} = application:get_env(rabbit, Key), Value. -send_on_channel0(Sock, Method, Protocol) -> - ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). +send_on_channel0(Sock, Method) -> + ok = rabbit_writer:internal_send_command(Sock, 0, Method). auth_mechanism_to_module(TypeBin, Sock) -> case rabbit_registry:binary_to_type(TypeBin) of @@ -1503,8 +1486,7 @@ auth_mechanisms_binary(Sock) -> auth_phase(Response, State = #v1{connection = Connection = - #connection{protocol = Protocol, - auth_mechanism = {Name, AuthMechanism}, + #connection{auth_mechanism = {Name, AuthMechanism}, auth_state = AuthState, host = RemoteAddress}, sock = Sock}) -> @@ -1522,7 +1504,7 @@ auth_phase(Response, {challenge, Challenge, AuthState1} -> rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, amqp091), Secure = #'connection.secure'{challenge = Challenge}, - ok = send_on_channel0(Sock, Secure, Protocol), + ok = send_on_channel0(Sock, Secure), State#v1{connection = Connection#connection{ auth_state = AuthState1}}; {ok, User = #user{username = Username}} -> @@ -1539,7 +1521,7 @@ auth_phase(Response, Tune = #'connection.tune'{frame_max = get_env(frame_max), channel_max = get_env(channel_max), heartbeat = get_env(heartbeat)}, - ok = send_on_channel0(Sock, Tune, Protocol), + ok = send_on_channel0(Sock, Tune), State#v1{connection_state = tuning, connection = Connection#connection{user = User, auth_state = none}} @@ -1550,8 +1532,7 @@ auth_phase(Response, no_return(). auth_fail(Username, Msg, Args, AuthName, - State = #v1{connection = #connection{protocol = Protocol, - capabilities = Capabilities}}) -> + State = #v1{connection = #connection{capabilities = Capabilities}}) -> notify_auth_result(Username, user_authentication_failure, [{error, rabbit_misc:format(Msg, Args)}], State), AmqpError = rabbit_misc:amqp_error( @@ -1566,8 +1547,8 @@ auth_fail(Username, Msg, Args, AuthName, "logfile.", [AuthName]), AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg}, {0, CloseMethod} = rabbit_binary_generator:map_exception( - 0, AmqpError1, Protocol), - ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); + 0, AmqpError1), + ok = send_on_channel0(State#v1.sock, CloseMethod); _ -> ok end, rabbit_misc:protocol_error(AmqpError). @@ -1640,6 +1621,11 @@ i(garbage_collection, _State) -> i(reductions = Item, _State) -> {Item, Reductions} = erlang:process_info(self(), Item), Reductions; +%% There can be only one protocol: AMQP 0.9.1. AMQP 1.0 is +%% handled by a different module. Before the handshake completes +%% we don't have an active protocol in use so we return 'none'. +i(protocol, #v1{callback = handshake}) -> none; +i(protocol, _State) -> rabbit_framing_amqp_0_9_1:version(); i(Item, #v1{connection = Conn}) -> ic(Item, Conn). ic(name, #connection{name = Name}) -> Name; @@ -1647,8 +1633,6 @@ ic(host, #connection{host = Host}) -> Host; ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost; ic(port, #connection{port = Port}) -> Port; ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort; -ic(protocol, #connection{protocol = none}) -> none; -ic(protocol, #connection{protocol = P}) -> P:version(); ic(user, #connection{user = none}) -> ''; ic(user, #connection{user = U}) -> U#user.username; ic(user_who_performed_action, C) -> ic(user, C); @@ -1709,15 +1693,15 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock, {Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket, Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}. -respond_and_close(State, Channel, Protocol, Reason, LogErr) -> +respond_and_close(State, Channel, Reason, LogErr) -> log_hard_error(State, Channel, LogErr), - send_error_on_channel0_and_close(Channel, Protocol, Reason, State). + send_error_on_channel0_and_close(Channel, Reason, State). -send_error_on_channel0_and_close(Channel, Protocol, Reason, State) -> +send_error_on_channel0_and_close(Channel, Reason, State) -> {0, CloseMethod} = - rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + rabbit_binary_generator:map_exception(Channel, Reason), State1 = close_connection(terminate_channels(State)), - ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol), + ok = send_on_channel0(State#v1.sock, CloseMethod), State1. %% diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 8a3ecba55e53..2a78d5ec0fb7 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -200,6 +200,7 @@ amqpl_table_x_header_array_of_tbls(_Config) -> amqpl_death_records(_Config) -> Content = #content{class_id = 60, properties = #'P_basic'{headers = []}, + properties_bin = none, payload_fragments_rev = [<<"data">>]}, Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())), @@ -235,6 +236,7 @@ amqpl_death_records(_Config) -> is_death_cycle(_Config) -> Content = #content{class_id = 60, properties = #'P_basic'{headers = []}, + properties_bin = none, payload_fragments_rev = [<<"data">>]}, Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())), @@ -639,8 +641,7 @@ amqp_amqpl(_Config) -> %% validate content is serialisable _ = rabbit_binary_generator:build_simple_content_frames(1, Content, - 1000000, - rabbit_framing_amqp_0_9_1), + 1000000), ok. diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 075065ea86ab..a0fe75b7ad43 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -403,7 +403,6 @@ enq_expire_enq_deq_test(Config) -> {S1, ok, _} = apply(meta(Config, Idx1, 100, {notify, 1, self()}), Enq1, S0), Msg2 = #basic_message{content = #content{properties = #'P_basic'{}, % class_id = 60, - % protocol = ?PROTOMOD, payload_fragments_rev = [<<"msg2">>]}}, Enq2 = rabbit_fifo:make_enqueue(self(), 2, Msg2), Idx2 = ?LINE, diff --git a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl index e269a599ce23..c8b816f42a6e 100644 --- a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl @@ -1952,11 +1952,13 @@ msg_gen() -> routing_keys = [<<>>], content = #content{payload_fragments_rev = [Bin], - properties = #'P_basic'{}}}))). + properties = #'P_basic'{}, + properties_bin = none}}))). msg(Bin) when is_binary(Bin) -> #basic_message{content = #content{payload_fragments_rev = [Bin], - properties = #'P_basic'{}}}. + properties = #'P_basic'{}, + properties_bin = none}}. checkout_cancel_gen(Pid) -> {checkout, Pid, cancel}. diff --git a/deps/rabbit/test/unit_amqp091_content_framing_SUITE.erl b/deps/rabbit/test/unit_amqp091_content_framing_SUITE.erl index 5fe249fe0a4b..e3399a0f3a7a 100644 --- a/deps/rabbit/test/unit_amqp091_content_framing_SUITE.erl +++ b/deps/rabbit/test/unit_amqp091_content_framing_SUITE.erl @@ -117,10 +117,8 @@ test_content_framing(FrameMax, BodyBin) -> rabbit_binary_generator:build_simple_content_frames( 1, rabbit_binary_generator:ensure_content_encoded( - rabbit_basic:build_content(#'P_basic'{}, BodyBin), - rabbit_framing_amqp_0_9_1), - FrameMax, - rabbit_framing_amqp_0_9_1), + rabbit_basic:build_content(#'P_basic'{}, BodyBin)), + FrameMax), %% header is formatted correctly and the size is the total of the %% fragments <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, @@ -148,13 +146,11 @@ content_transcoding(_Config) -> C1 end, EnsureEncoded = - fun (Protocol) -> - fun (C0) -> - C1 = rabbit_binary_generator:ensure_content_encoded( - C0, Protocol), - true = C1#content.properties_bin =/= none, - C1 - end + fun (C0) -> + C1 = rabbit_binary_generator:ensure_content_encoded( + C0), + true = C1#content.properties_bin =/= none, + C1 end, %% Beyond the assertions in Ensure*, the only testable guarantee %% is that the operations should never fail. @@ -171,14 +167,13 @@ content_transcoding(_Config) -> sequence_with_content([ClearEncoded, Op]), sequence_with_content([ClearDecoded, Op]) end || Op <- [ClearDecoded, ClearEncoded, EnsureDecoded, - EnsureEncoded(rabbit_framing_amqp_0_9_1)]], + EnsureEncoded]], passed. sequence_with_content(Sequence) -> lists:foldl(fun (F, V) -> F(F(V)) end, rabbit_binary_generator:ensure_content_encoded( - rabbit_basic:build_content(#'P_basic'{}, <<>>), - rabbit_framing_amqp_0_9_1), + rabbit_basic:build_content(#'P_basic'{}, <<>>)), Sequence). table_codec(_Config) -> diff --git a/deps/rabbit_common/include/rabbit.hrl b/deps/rabbit_common/include/rabbit.hrl index 8c55bb600159..995fa907ad56 100644 --- a/deps/rabbit_common/include/rabbit.hrl +++ b/deps/rabbit_common/include/rabbit.hrl @@ -39,9 +39,6 @@ port, %% client port peer_port, - %% protocol implementation module, - %% e.g. rabbit_framing_amqp_0_9_1 - protocol, user, %% heartbeat timeout value used, 0 means %% heartbeats are disabled @@ -74,6 +71,10 @@ properties_bin, %% either 'none', or an encoded properties binary %% Note: at most one of properties and properties_bin can be %% 'none' at once. + %% @todo The protocol field can be safely removed entirely in the + %% RabbitMQ version that follows the LTS that is after + %% RabbitMQ 4.3 (so if LTS is 4.5, remove in 4.6). + %% Only compat code for reading from disk will be necessary. protocol, %% The protocol under which properties_bin was encoded payload_fragments_rev %% list of binaries, in reverse order (!) }). diff --git a/deps/rabbit_common/src/rabbit_binary_generator.erl b/deps/rabbit_common/src/rabbit_binary_generator.erl index f5cf8a03a040..f8c056817dda 100644 --- a/deps/rabbit_common/src/rabbit_binary_generator.erl +++ b/deps/rabbit_common/src/rabbit_binary_generator.erl @@ -10,54 +10,52 @@ -include("rabbit.hrl"). -include_lib("kernel/include/logger.hrl"). --export([build_simple_method_frame/3, - build_simple_content_frames/4, +-export([build_simple_method_frame/2, + build_simple_content_frames/3, build_heartbeat_frame/0]). -export([generate_table/1]). -export([check_empty_frame_size/0]). --export([ensure_content_encoded/2, clear_encoded_content/1]). --export([map_exception/3]). +-export([ensure_content_encoded/1, clear_encoded_content/1]). +-export([map_exception/2]). %%---------------------------------------------------------------------------- -type frame() :: [binary()]. -spec build_simple_method_frame - (rabbit_types:channel_number(), rabbit_framing:amqp_method_record(), - rabbit_types:protocol()) -> + (rabbit_types:channel_number(), rabbit_framing:amqp_method_record()) -> frame(). -spec build_simple_content_frames (rabbit_types:channel_number(), rabbit_types:content(), - non_neg_integer(), rabbit_types:protocol()) -> + non_neg_integer()) -> [frame()]. -spec build_heartbeat_frame() -> frame(). -spec generate_table(rabbit_framing:amqp_table()) -> binary(). -spec check_empty_frame_size() -> 'ok'. -spec ensure_content_encoded - (rabbit_types:content(), rabbit_types:protocol()) -> + (rabbit_types:content()) -> rabbit_types:encoded_content(). -spec clear_encoded_content (rabbit_types:content()) -> rabbit_types:unencoded_content(). -spec map_exception - (rabbit_types:channel_number(), rabbit_types:amqp_error() | any(), - rabbit_types:protocol()) -> + (rabbit_types:channel_number(), rabbit_types:amqp_error() | any()) -> {rabbit_types:channel_number(), rabbit_framing:amqp_method_record()}. %%---------------------------------------------------------------------------- -build_simple_method_frame(ChannelInt, MethodRecord, Protocol) -> - MethodFields = Protocol:encode_method_fields(MethodRecord), +build_simple_method_frame(ChannelInt, MethodRecord) -> + MethodFields = rabbit_framing_amqp_0_9_1:encode_method_fields(MethodRecord), MethodName = rabbit_misc:method_record_type(MethodRecord), - {ClassId, MethodId} = Protocol:method_id(MethodName), + {ClassId, MethodId} = rabbit_framing_amqp_0_9_1:method_id(MethodName), create_frame(1, ChannelInt, [<>, MethodFields]). -build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) -> +build_simple_content_frames(ChannelInt, Content, FrameMax) -> #content{class_id = ClassId, properties_bin = ContentPropertiesBin, payload_fragments_rev = PayloadFragmentsRev} = - ensure_content_encoded(Content, Protocol), + ensure_content_encoded(Content), {BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt), HeaderFrame = create_frame(2, ChannelInt, @@ -168,25 +166,14 @@ check_empty_frame_size() -> ComputedSize, ?EMPTY_FRAME_SIZE}) end. -ensure_content_encoded(Content = #content{properties_bin = PropBin, - protocol = Protocol}, Protocol) +ensure_content_encoded(Content = #content{properties_bin = PropBin}) when PropBin =/= none -> Content; -ensure_content_encoded(Content = #content{properties = none, - properties_bin = PropBin, - protocol = Protocol}, Protocol1) - when PropBin =/= none -> - Props = Protocol:decode_properties(Content#content.class_id, PropBin), - Content#content{properties = Props, - properties_bin = Protocol1:encode_properties(Props), - protocol = Protocol1}; -ensure_content_encoded(Content = #content{properties = Props}, Protocol) +ensure_content_encoded(Content = #content{properties = Props}) when Props =/= none -> - Content#content{properties_bin = Protocol:encode_properties(Props), - protocol = Protocol}. + Content#content{properties_bin = rabbit_framing_amqp_0_9_1:encode_properties(Props)}. -clear_encoded_content(Content = #content{properties_bin = none, - protocol = none}) -> +clear_encoded_content(Content = #content{properties_bin = none}) -> Content; clear_encoded_content(Content = #content{properties = none}) -> %% Only clear when we can rebuild the properties_bin later in @@ -194,16 +181,16 @@ clear_encoded_content(Content = #content{properties = none}) -> %% one of properties and properties_bin can be 'none' Content; clear_encoded_content(Content = #content{}) -> - Content#content{properties_bin = none, protocol = none}. + Content#content{properties_bin = none}. %% NB: this function is also used by the Erlang client -map_exception(Channel, Reason, Protocol) -> +map_exception(Channel, Reason) -> {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = - lookup_amqp_exception(Reason, Protocol), + lookup_amqp_exception(Reason), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; none -> {0, 0}; - _ -> Protocol:method_id(FailedMethod) + _ -> rabbit_framing_amqp_0_9_1:method_id(FailedMethod) end, case SuggestedClose orelse (Channel == 0) of true -> {0, #'connection.close'{reply_code = ReplyCode, @@ -218,14 +205,13 @@ map_exception(Channel, Reason, Protocol) -> lookup_amqp_exception(#amqp_error{name = Name, explanation = Expl, - method = Method}, - Protocol) -> - {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), + method = Method}) -> + {ShouldClose, Code, Text} = rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Name), ExplBin = amqp_exception_explanation(Text, Expl), {ShouldClose, Code, ExplBin, Method}; -lookup_amqp_exception(Other, Protocol) -> +lookup_amqp_exception(Other) -> ?LOG_WARNING("Non-AMQP exit reason '~tp'", [Other]), - {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), + {ShouldClose, Code, Text} = rabbit_framing_amqp_0_9_1:lookup_amqp_exception(internal_error), {ShouldClose, Code, Text, none}. amqp_exception_explanation(Text, Expl) -> diff --git a/deps/rabbit_common/src/rabbit_binary_parser.erl b/deps/rabbit_common/src/rabbit_binary_parser.erl index a6df3700f436..1228b9721204 100644 --- a/deps/rabbit_common/src/rabbit_binary_parser.erl +++ b/deps/rabbit_common/src/rabbit_binary_parser.erl @@ -140,10 +140,9 @@ parse_array(<<$V, Rest/binary>>) -> ensure_content_decoded(Content = #content{properties = Props}) when Props =/= none -> Content; -ensure_content_decoded(Content = #content{properties_bin = PropBin, - protocol = Protocol}) +ensure_content_decoded(Content = #content{properties_bin = PropBin}) when PropBin =/= none -> - Content#content{properties = Protocol:decode_properties( + Content#content{properties = rabbit_framing_amqp_0_9_1:decode_properties( Content#content.class_id, PropBin)}. clear_decoded_content(Content = #content{properties = none}) -> diff --git a/deps/rabbit_common/src/rabbit_command_assembler.erl b/deps/rabbit_common/src/rabbit_command_assembler.erl index 9a4fc317f173..850851af86e9 100644 --- a/deps/rabbit_common/src/rabbit_command_assembler.erl +++ b/deps/rabbit_common/src/rabbit_command_assembler.erl @@ -9,7 +9,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([analyze_frame/3, init/1, process/2]). +-export([analyze_frame/2, init/0, process/2]). %%---------------------------------------------------------------------------- @@ -19,7 +19,6 @@ -type frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY | ?FRAME_HEARTBEAT. --type protocol() :: rabbit_framing:protocol(). -type method() :: rabbit_framing:amqp_method_record(). -type class_id() :: rabbit_framing:amqp_class_id(). -type weight() :: non_neg_integer(). @@ -32,14 +31,14 @@ {'content_body', binary()}. -type state() :: - {'method', protocol()} | - {'content_header', method(), class_id(), protocol()} | - {'content_body', method(), body_size(), class_id(), protocol()}. + 'method' | + {'content_header', method(), class_id()} | + {'content_body', method(), body_size(), class_id()}. --spec analyze_frame(frame_type(), binary(), protocol()) -> +-spec analyze_frame(frame_type(), binary()) -> frame() | 'heartbeat' | 'error'. --spec init(protocol()) -> {ok, state()}. +-spec init() -> {ok, state()}. -spec process(frame(), state()) -> {ok, state()} | {ok, method(), state()} | @@ -49,72 +48,70 @@ %%-------------------------------------------------------------------- analyze_frame(?FRAME_METHOD, - <>, - Protocol) -> - MethodName = Protocol:lookup_method_name({ClassId, MethodId}), + <>) -> + MethodName = rabbit_framing_amqp_0_9_1:lookup_method_name({ClassId, MethodId}), {method, MethodName, MethodFields}; analyze_frame(?FRAME_HEADER, - <>, - _Protocol) -> + <>) -> {content_header, ClassId, Weight, BodySize, Properties}; -analyze_frame(?FRAME_BODY, Body, _Protocol) -> +analyze_frame(?FRAME_BODY, Body) -> {content_body, Body}; -analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> +analyze_frame(?FRAME_HEARTBEAT, <<>>) -> heartbeat; -analyze_frame(_Type, _Body, _Protocol) -> +analyze_frame(_Type, _Body) -> error. -init(Protocol) -> {ok, {method, Protocol}}. +init() -> {ok, method}. -process({method, MethodName, FieldsBin}, {method, Protocol}) -> +process({method, MethodName, FieldsBin}, method) -> try - Method = Protocol:decode_method_fields(MethodName, FieldsBin), - case Protocol:method_has_content(MethodName) of - true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), - {ok, {content_header, Method, ClassId, Protocol}}; - false -> {ok, Method, {method, Protocol}} + Method = rabbit_framing_amqp_0_9_1:decode_method_fields(MethodName, FieldsBin), + case rabbit_framing_amqp_0_9_1:method_has_content(MethodName) of + true -> {ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id(MethodName), + {ok, {content_header, Method, ClassId}}; + false -> {ok, Method, method} end catch exit:#amqp_error{} = Reason -> {error, Reason} end; -process(_Frame, {method, _Protocol}) -> +process(_Frame, method) -> unexpected_frame("expected method frame, " "got non method frame instead", [], none); process({content_header, ClassId, 0, 0, PropertiesBin}, - {content_header, Method, ClassId, Protocol}) -> - Content = empty_content(ClassId, PropertiesBin, Protocol), - {ok, Method, Content, {method, Protocol}}; + {content_header, Method, ClassId}) -> + Content = empty_content(ClassId, PropertiesBin), + {ok, Method, Content, method}; process({content_header, ClassId, 0, BodySize, PropertiesBin}, - {content_header, Method, ClassId, Protocol}) -> - Content = empty_content(ClassId, PropertiesBin, Protocol), - {ok, {content_body, Method, BodySize, Content, Protocol}}; + {content_header, Method, ClassId}) -> + Content = empty_content(ClassId, PropertiesBin), + {ok, {content_body, Method, BodySize, Content}}; process({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin}, - {content_header, Method, ClassId, _Protocol}) -> + {content_header, Method, ClassId}) -> unexpected_frame("expected content header for class ~w, " "got one for class ~w instead", [ClassId, HeaderClassId], Method); -process(_Frame, {content_header, Method, ClassId, _Protocol}) -> +process(_Frame, {content_header, Method, ClassId}) -> unexpected_frame("expected content header for class ~w, " "got non content header frame instead", [ClassId], Method); process({content_body, FragmentBin}, {content_body, Method, RemainingSize, - Content = #content{payload_fragments_rev = Fragments}, Protocol}) -> + Content = #content{payload_fragments_rev = Fragments}}) -> NewContent = Content#content{ payload_fragments_rev = [FragmentBin | Fragments]}, case RemainingSize - size(FragmentBin) of - 0 -> {ok, Method, NewContent, {method, Protocol}}; - Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}} + 0 -> {ok, Method, NewContent, method}; + Sz -> {ok, {content_body, Method, Sz, NewContent}} end; -process(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) -> +process(_Frame, {content_body, Method, _RemainingSize, _Content}) -> unexpected_frame("expected content body, " "got non content body frame instead", [], Method). %%-------------------------------------------------------------------- -empty_content(ClassId, PropertiesBin, Protocol) -> +empty_content(ClassId, PropertiesBin) -> #content{class_id = ClassId, properties = none, properties_bin = PropertiesBin, - protocol = Protocol, + protocol = rabbit_framing_amqp_0_9_1, payload_fragments_rev = []}. unexpected_frame(Format, Params, Method) when is_atom(Method) -> diff --git a/deps/rabbit_common/src/rabbit_types.erl b/deps/rabbit_common/src/rabbit_types.erl index 86115a849bc6..fba63bc02d25 100644 --- a/deps/rabbit_common/src/rabbit_types.erl +++ b/deps/rabbit_common/src/rabbit_types.erl @@ -172,7 +172,10 @@ username :: username(), connection :: connection()}). -%% old AMQP 0-9-1-centric type, avoid when possible +%% Old AMQP 0-9-1-centric type, DO NOT USE. +%% @todo This can be removed in a future release when +%% we no longer need to worry about amqp_client +%% direct connection compatibility. -type(protocol() :: rabbit_framing:protocol()). -type(protocol_name() :: 'amqp0_9_1' | 'amqp1_0' | 'mqtt' | 'stomp' | any()). diff --git a/deps/rabbit_common/src/rabbit_writer.erl b/deps/rabbit_common/src/rabbit_writer.erl index 4cbb58caab32..c25af04aecd0 100644 --- a/deps/rabbit_common/src/rabbit_writer.erl +++ b/deps/rabbit_common/src/rabbit_writer.erl @@ -28,7 +28,7 @@ -include("rabbit.hrl"). -include_lib("kernel/include/logger.hrl"). --export([start/6, start_link/6, start/7, start_link/7, start/8, start_link/8]). +-export([start_link/6, start_link/7]). -export([init/1, handle_call/3, @@ -42,7 +42,7 @@ send_command_and_notify/4, send_command_and_notify/5, send_command_flow/2, send_command_flow/3, flush/1]). --export([internal_send_command/4, internal_send_command/6]). +-export([internal_send_command/3]). -export([msg_size/1, maybe_gc_large_msg/1, maybe_gc_large_msg/2]). -record(wstate, { @@ -52,8 +52,6 @@ channel, %% connection-negotiated frame_max setting frame_max, - %% see #connection.protocol in rabbit_reader - protocol, %% connection (rabbit_reader) process reader, %% statistics emission timer @@ -71,34 +69,14 @@ %%--------------------------------------------------------------------------- --spec start - (rabbit_net:socket(), rabbit_types:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), - rabbit_types:proc_name()) -> - rabbit_types:ok(pid()). -spec start_link (rabbit_net:socket(), rabbit_types:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), - rabbit_types:proc_name()) -> - rabbit_types:ok(pid()). --spec start - (rabbit_net:socket(), rabbit_types:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), + non_neg_integer(), pid(), rabbit_types:proc_name(), boolean()) -> rabbit_types:ok(pid()). -spec start_link (rabbit_net:socket(), rabbit_types:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), - rabbit_types:proc_name(), boolean()) -> - rabbit_types:ok(pid()). --spec start - (rabbit_net:socket(), rabbit_types:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), - rabbit_types:proc_name(), boolean(), undefined|non_neg_integer()) -> - rabbit_types:ok(pid()). --spec start_link - (rabbit_net:socket(), rabbit_types:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), + non_neg_integer(), pid(), rabbit_types:proc_name(), boolean(), undefined|non_neg_integer()) -> rabbit_types:ok(pid()). @@ -123,12 +101,7 @@ -spec flush(pid()) -> 'ok'. -spec internal_send_command (rabbit_net:socket(), rabbit_types:channel_number(), - rabbit_framing:amqp_method_record(), rabbit_types:protocol()) -> - 'ok'. --spec internal_send_command - (rabbit_net:socket(), rabbit_types:channel_number(), - rabbit_framing:amqp_method_record(), rabbit_types:content(), - non_neg_integer(), rabbit_types:protocol()) -> + rabbit_framing:amqp_method_record()) -> 'ok'. -spec msg_size @@ -142,44 +115,25 @@ %%--------------------------------------------------------------------------- -start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) -> - start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false). - -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) -> - start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false). - -start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, - ReaderWantsStats) -> - start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, - ReaderWantsStats, ?DEFAULT_GC_THRESHOLD). - -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, +start_link(Sock, Channel, FrameMax, ReaderPid, Identity, ReaderWantsStats) -> - start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + start_link(Sock, Channel, FrameMax, ReaderPid, Identity, ReaderWantsStats, ?DEFAULT_GC_THRESHOLD). -start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, - ReaderWantsStats, GCThreshold) -> - State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, - ReaderWantsStats, GCThreshold), - Options = [{hibernate_after, ?HIBERNATE_AFTER}], - gen_server:start(?MODULE, [Identity, State], Options). - -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, +start_link(Sock, Channel, FrameMax, ReaderPid, Identity, ReaderWantsStats, GCThreshold) -> - State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, + State = initial_state(Sock, Channel, FrameMax, ReaderPid, ReaderWantsStats, GCThreshold), Options = [{hibernate_after, ?HIBERNATE_AFTER}], gen_server:start_link(?MODULE, [Identity, State], Options). -initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats, GCThreshold) -> +initial_state(Sock, Channel, FrameMax, ReaderPid, ReaderWantsStats, GCThreshold) -> (case ReaderWantsStats of true -> fun rabbit_event:init_stats_timer/2; false -> fun rabbit_event:init_disabled_stats_timer/2 end)(#wstate{sock = Sock, channel = Channel, frame_max = FrameMax, - protocol = Protocol, reader = ReaderPid, pending = [], writer_gc_threshold = GCThreshold}, @@ -324,48 +278,38 @@ call(Pid, Msg) -> %%--------------------------------------------------------------------------- -assemble_frame(Channel, MethodRecord, Protocol) -> +assemble_frame(Channel, MethodRecord) -> rabbit_binary_generator:build_simple_method_frame( - Channel, MethodRecord, Protocol). + Channel, MethodRecord). -assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> +assemble_frames(Channel, MethodRecord, Content, FrameMax) -> MethodName = rabbit_misc:method_record_type(MethodRecord), - true = Protocol:method_has_content(MethodName), % assertion + true = rabbit_framing_amqp_0_9_1:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( - Channel, MethodRecord, Protocol), + Channel, MethodRecord), ContentFrames = rabbit_binary_generator:build_simple_content_frames( - Channel, Content, FrameMax, Protocol), + Channel, Content, FrameMax), [MethodFrame | ContentFrames]. tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). -internal_send_command(Sock, Channel, MethodRecord, Protocol) -> - ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, Protocol)). - -internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, - Protocol) -> - ok = lists:foldl(fun (Frame, ok) -> tcp_send(Sock, Frame); - (_Frame, Other) -> Other - end, ok, assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). +internal_send_command(Sock, Channel, MethodRecord) -> + ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord)). internal_send_command_async(MethodRecord, State = #wstate{channel = Channel, - protocol = Protocol, pending = Pending}) -> - Frame = assemble_frame(Channel, MethodRecord, Protocol), + Frame = assemble_frame(Channel, MethodRecord), maybe_flush(State#wstate{pending = [Frame | Pending]}). internal_send_command_async(MethodRecord, Content, State = #wstate{channel = Channel, frame_max = FrameMax, - protocol = Protocol, pending = Pending, writer_gc_threshold = GCThreshold}) -> - Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax, - Protocol), + Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax), _ = maybe_gc_large_msg(Content, GCThreshold), maybe_flush(State#wstate{pending = [Frames | Pending]}). diff --git a/deps/rabbit_common/test/unit_SUITE.erl b/deps/rabbit_common/test/unit_SUITE.erl index e68f76282c22..b42f6c6d7554 100644 --- a/deps/rabbit_common/test/unit_SUITE.erl +++ b/deps/rabbit_common/test/unit_SUITE.erl @@ -482,10 +482,10 @@ encrypt_decrypt_term(_Config) -> frame_encoding_does_not_fail_with_empty_binary_payload(_Config) -> [begin Content = #content{ - class_id = 60, properties = none, properties_bin = <<0,0>>, protocol = rabbit_framing_amqp_0_9_1, + class_id = 60, properties = none, properties_bin = <<0,0>>, payload_fragments_rev = P }, - ExpectedFrames = rabbit_binary_generator:build_simple_content_frames(1, Content, 0, rabbit_framing_amqp_0_9_1) + ExpectedFrames = rabbit_binary_generator:build_simple_content_frames(1, Content, 0) end || {P, ExpectedFrames} <- [ {[], [[<<2,0,1,0,0,0,14>>,[<<0,60,0,0,0,0,0,0,0,0,0,0>>,<<0,0>>],206]]}, {[<<>>], [[<<2,0,1,0,0,0,14>>,[<<0,60,0,0,0,0,0,0,0,0,0,0>>,<<0,0>>],206]]}, @@ -497,15 +497,13 @@ frame_encoding_does_not_fail_with_empty_binary_payload(_Config) -> map_exception_does_not_fail_with_unicode_explaination_case1(_Config) -> NonAsciiExplaination = "no queue 'non_ascii_name_😍_δ½ ε₯½' in vhost '/'", rabbit_binary_generator:map_exception(0, - #amqp_error{name = not_found, explanation = NonAsciiExplaination, method = 'queue.declare'}, - rabbit_framing_amqp_0_9_1), + #amqp_error{name = not_found, explanation = NonAsciiExplaination, method = 'queue.declare'}), ok. map_exception_does_not_fail_with_unicode_explaination_case2(_Config) -> NonAsciiExplaination = "no queue 'ΠΊΡ€ΠΎΠ»ΠΈΠΊ 🐰' in vhost '/'", rabbit_binary_generator:map_exception(0, - #amqp_error{name = not_found, explanation = NonAsciiExplaination, method = 'queue.declare'}, - rabbit_framing_amqp_0_9_1), + #amqp_error{name = not_found, explanation = NonAsciiExplaination, method = 'queue.declare'}), ok. amqp_table_conversion(_Config) -> diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index 3e7454330970..9abd83c0c3fb 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -2456,9 +2456,17 @@ test_channel() -> Me = self(), Writer = spawn(fun () -> test_writer(Me) end), {ok, Limiter} = rabbit_limiter:start_link(no_id), - {ok, Ch} = rabbit_channel:start_link( - 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, - user(<<"guest">>), <<"/">>, [], Me, Limiter), + {ok, Ch} = case erlang:function_exported(rabbit_channel, start_link, 12) of + %% @todo Remove release after LTS after 4.3. + true -> + rabbit_channel:start_link( + 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, + user(<<"guest">>), <<"/">>, [], Me, Limiter); + false -> + rabbit_channel:start_link( + 1, Me, Writer, Me, "", + user(<<"guest">>), <<"/">>, [], Me, Limiter) + end, {Writer, Limiter, Ch}. test_writer(Pid) ->