diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index bf7694b5a00b..c4b1b43e7246 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -340,46 +340,49 @@ process_connect(Implicit, Frame, {?HEADER_HEART_BEAT, io_lib:format("~B,~B", [SendTimeout, ReceiveTimeout])}, {?HEADER_VERSION, Version}], - ok('CONNECTED', - case application:get_env(rabbitmq_stomp, hide_server_info, false) of - true -> Headers; - false -> [{?HEADER_SERVER, server_header()} | Headers] - end, - "", - StateN1#state{cfg = #cfg{ + + Res = ok("CONNECTED", + case application:get_env(rabbitmq_stomp, hide_server_info, false) of + true -> Headers; + false -> [{?HEADER_SERVER, server_header()} | Headers] + end, + "", + StateN1#state{cfg = #cfg{ session_id = SessionId, - version = Version - }, - user = User, - authz_ctx = AuthzCtx}) + version = Version + }, + user = User, + authz_ctx = AuthzCtx}), + self() ! connection_created, + Res else {error, no_common_version} -> error("Version mismatch", "Supported versions are ~ts~n", [string:join(?SUPPORTED_VERSIONS, ",")], StateN); - {error, not_allowed} -> + {error, not_allowed, EUsername, EVHost} -> rabbit_log:warning("STOMP login failed for user '~ts': " - "virtual host access not allowed", [Username]), + "virtual host access not allowed", [EUsername]), error("Bad CONNECT", "Virtual host '" ++ - binary_to_list(VHost) ++ + binary_to_list(EVHost) ++ "' access denied", State); {refused, Username1, _Msg, _Args} -> rabbit_log:warning("STOMP login failed for user '~ts': authentication failed", [Username1]), error("Bad CONNECT", "Access refused for user '" ++ binary_to_list(Username1) ++ "'", [], State); - {error, not_loopback} -> + {error, not_loopback, EUsername} -> rabbit_log:warning("STOMP login failed for user '~ts': " - "this user's access is restricted to localhost", [Username]), + "this user's access is restricted to localhost", [EUsername]), error("Bad CONNECT", "non-loopback access denied", State) - end, + end case {Res, Implicit} of {{ok, _, StateN2}, implicit} -> self() ! connection_created, ok(StateN2); _ -> self() ! connection_created, Res - end - end, + + end, State). creds(_, _, #cfg{default_login = DefLogin, @@ -903,16 +906,6 @@ do_send(Destination, _DestHdr, io:format("Message: ~p~n", [Message]), - %% {ok, BasicMessage} = rabbit_basic:message(ExchangeName, RoutingKey, Content), - - %% Delivery = #delivery{ - %% mandatory = false, - %% confirm = DoConfirm, - %% sender = self(), - %% message = BasicMessage, - %% msg_seq_no = MsgSeqNo, - %% flow = Flow - %% }, QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}), io:format("QNames ~p~n", [QNames]), @@ -1309,8 +1302,11 @@ ensure_reply_queue(TempQueueId, State = #state{reply_queues = RQS, #resource{name = QNameBin} = QName = amqqueue:get_name(Queue), ConsumerTag = rabbit_stomp_util:consumer_tag_reply_to(TempQueueId), + + + {ok, {_Global, DefaultPrefetch}} = application:get_env(rabbit, default_consumer_prefetch), Spec = #{no_ack => true, - prefetch_count => application:get_env(rabbit, default_consumer_prefetch), + prefetch_count => DefaultPrefetch, consumer_tag => ConsumerTag, exclusive_consume => false, args => []}, @@ -1709,7 +1705,6 @@ check_resource_access(User, Resource, Perm, Context) -> handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason}, State0 = #state{queue_states = QStates0} = State) -> - credit_flow:peer_down(QPid), case rabbit_queue_type:handle_down(QPid, QName, Reason, QStates0) of {ok, QStates1, Actions} -> State1 = State0#state{queue_states = QStates1}, @@ -1773,13 +1768,6 @@ handle_queue_actions(Actions, #state{} = State0) -> record_rejects(Rej, S); ({queue_down, QRef}, S0) -> handle_consuming_queue_down_or_eol(QRef, S0); - %% TODO: I have no idea about the scope of credit_flow - ({block, QName}, S0) -> - credit_flow:block(QName), - S0; - ({unblock, QName}, S0) -> - credit_flow:unblock(QName), - S0; %% TODO: in rabbit_channel there code for handling %% send_drained and send_credit_reply %% I'm doing catch all here to not crash? diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl index 31a85214d202..f5da29e3e94c 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl @@ -18,7 +18,6 @@ -include("rabbit_stomp.hrl"). -include("rabbit_stomp_frame.hrl"). --include_lib("amqp_client/include/amqp_client.hrl"). -record(reader_state, { socket, @@ -404,6 +403,20 @@ processor_args(Configuration, Sock) -> ssl_login_name(RealSocket, Configuration), PeerAddr}. adapter_info(Sock) -> +case rabbit_net:socket_ends(Socket, inbound) of + {ok, {PeerIp, PeerPort, Ip, Port}} -> +#amqp_adapter_info{protocol = {'STOMP', 0}, + name = Name, + host = Host, + port = Port, + peer_host = PeerHost, + peer_port = PeerPort, + additional_info = maybe_ssl_info(Sock)} + process_connect(ConnectPacket, Socket, ConnName, SendFun, SocketEnds); + {error, Reason} -> + {error, {socket_ends, Reason}} + end. + amqp_connection:socket_adapter_info(Sock, {'STOMP', 0}). ssl_login_name(_Sock, #stomp_configuration{ssl_cert_login = false}) ->