Skip to content

Commit e43e0c5

Browse files
committed
move auth to handler
1 parent 16fcfe8 commit e43e0c5

File tree

3 files changed

+149
-140
lines changed

3 files changed

+149
-140
lines changed

src/rabbit_web_ocpp_app.erl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
-spec start(_, _) -> {ok, pid()}.
3636
start(_Type, _StartArgs) ->
37+
init_global_counters(),
3738
ocpp_init(),
3839
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
3940

@@ -95,6 +96,21 @@ emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
9596
%% Implementation
9697
%%
9798

99+
init_global_counters() ->
100+
lists:foreach(fun init_global_counters/1, [?OCPP_PROTO_V12,
101+
?OCPP_PROTO_V15,
102+
?OCPP_PROTO_V16,
103+
?OCPP_PROTO_V20,
104+
?OCPP_PROTO_V201,
105+
?OCPP_PROTO_V21]).
106+
107+
init_global_counters(ProtoVer) ->
108+
Proto = {protocol, ProtoVer},
109+
rabbit_global_counters:init([Proto]),
110+
rabbit_global_counters:init([Proto, {queue_type, rabbit_classic_queue}]),
111+
rabbit_global_counters:init([Proto, {queue_type, rabbit_quorum_queue}]),
112+
rabbit_global_counters:init([Proto, {queue_type, rabbit_stream_queue}]).
113+
98114
ocpp_init() ->
99115
CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])),
100116
CowboyWsOpts = maps:from_list(get_env(cowboy_ws_opts, [])),

src/rabbit_web_ocpp_handler.erl

Lines changed: 125 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
upgrade/5,
3030
takeover/7]).
3131

32-
-define(APP, rabbitmq_web_ocpp).
33-
3432
-ifdef(TEST).
3533
-define(SILENT_CLOSE_DELAY, 10).
3634
-else.
@@ -74,70 +72,47 @@ init(Req, Opts) ->
7472
%% Retrieve the vhost and client_id from URL path first
7573
Vhost = cowboy_req:binding(vhost, Req),
7674
ClientId = cowboy_req:binding(client_id, Req),
75+
{PeerIp, _PeerPort} = cowboy_req:peer(Req),
7776

7877
case {Vhost, ClientId} of
7978
{<<>>, _} ->
8079
{ok, cowboy_req:reply(404, #{}, <<"Vhost not specified">>, Req), #state{}};
8180
{_, <<>>} ->
8281
{ok, cowboy_req:reply(404, #{}, <<"Client ID not specified">>, Req), #state{}};
83-
{V1, CId} when V1 =:= <<>> orelse CId =:= <<>> ->
84-
{ok, cowboy_req:reply(404, #{}, <<"Invalid Vhost or Client ID">>, Req), #state{}};
85-
{V2, CId} ->
86-
%% ClientId is valid, now check subprotocol
87-
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
88-
undefined ->
89-
no_supported_sub_protocol(undefined, ClientId, Req);
90-
ProtocolList ->
91-
%% Map protocols to their atom representations, filtering out unsupported ones
92-
SupportedProtos = [{Proto, ?OCPP_PROTO_TO_ATOM(Proto)} ||
93-
Proto <- ProtocolList,
94-
?OCPP_PROTO_TO_ATOM(Proto) =/= undefined],
95-
96-
case SupportedProtos of
97-
[] ->
98-
no_supported_sub_protocol(ProtocolList, ClientId, Req);
99-
[{MatchingProtocol, ProtocolVer}|_] ->
100-
%% First supported protocol is selected (preserving client preference order)
101-
Req1 = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>,
102-
MatchingProtocol, Req),
103-
AuthHd = cowboy_req:header(<<"authorization">>, Req, <<>>),
104-
case AuthHd of
105-
<<>> ->
106-
%% No Authorization header, request credentials
107-
Headers = #{<<"www-authenticate">> => <<"Basic realm=\"RabbitMQ\"">>},
108-
{ok, cowboy_req:reply(401, Headers, <<"Unauthorized">>, Req), #state{}};
109-
_ ->
110-
case cow_http_hd:parse_authorization(AuthHd) of
111-
{basic, Username, Password} ->
112-
%% Perform authentication check here
113-
case rabbit_access_control:check_user_login(
114-
Username, [{password, Password}]) of
115-
{ok, User} ->
116-
State0 = #state{socket = maps:get(proxy_header, Req, undefined),
117-
proto_ver = ProtocolVer,
118-
vhost = V2,
119-
user = User,
120-
client_id = CId},
121-
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
122-
IdleTimeoutMs = maps:get(idle_timeout, WsOpts0, ?DEFAULT_IDLE_TIMEOUT_MS),
123-
WsOpts = maps:merge(#{compress => true, idle_timeout => IdleTimeoutMs}, WsOpts0),
124-
IdleTimeoutS = case IdleTimeoutMs of
125-
infinity -> 0;
126-
Ms -> Ms div 1000
127-
end,
128-
State = State0#state{idle_timeout = IdleTimeoutS},
129-
{?MODULE, Req1, State, WsOpts};
130-
{error, _Reason} ->
131-
Headers = #{<<"www-authenticate">> => <<"Basic realm=\"RabbitMQ\"">>},
132-
{ok, cowboy_req:reply(401, Headers, <<"Unauthorized">>, Req), #state{}}
133-
end;
134-
_ ->
135-
%% Invalid Authorization header format
136-
Headers = #{<<"www-authenticate">> => <<"Basic realm=\"RabbitMQ\"">>},
137-
{ok, cowboy_req:reply(401, Headers, <<"Unauthorized">>, Req), #state{}}
138-
end
139-
end
140-
end
82+
_ ->
83+
{Username0, Password0} = basic_auth_creds(Req),
84+
SslLoginName = none,
85+
Result = maybe
86+
ok ?= check_vhost_exists(Vhost, ClientId, PeerIp),
87+
ok ?= check_vhost_alive(Vhost),
88+
{ProtoVer, Req1} ?= pick_protocol(Req, ClientId),
89+
{ok, Username1, Password1} ?= check_credentials(Username0, Password0, SslLoginName, PeerIp),
90+
{ok, User0} ?= check_user_login(Vhost, Username1, Password1, ClientId, PeerIp),
91+
AuthzCtx = #{<<"client_id">> => ClientId, <<"protocol">> => <<"ocpp">>},
92+
ok ?= check_vhost_access(Vhost, User0, ClientId, PeerIp, AuthzCtx),
93+
{ok, Req1, Vhost, ClientId, User0, ProtoVer}
94+
end,
95+
case Result of
96+
{ok, Req2, V2, CId, User, ProtocolVer} ->
97+
ProxyInfo = maps:get(proxy_header, Req2, undefined),
98+
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
99+
IdleMs = maps:get(idle_timeout, WsOpts0, ?DEFAULT_IDLE_TIMEOUT_MS),
100+
WsOpts = maps:merge(#{compress => true, idle_timeout => IdleMs}, WsOpts0),
101+
IdleSec = case IdleMs of infinity -> 0; Ms -> Ms div 1000 end,
102+
State = #state{socket = ProxyInfo, proto_ver = ProtocolVer, vhost = V2,
103+
user = User, client_id = CId, idle_timeout = IdleSec},
104+
{?MODULE, Req2, State, WsOpts};
105+
{error, bad_vhost} ->
106+
{ok, cowboy_req:reply(404, #{}, <<"Invalid Vhost">>, Req), #state{}};
107+
{error, vhost_down} ->
108+
{ok, cowboy_req:reply(503, #{}, <<"Vhost is down">>, Req), #state{}};
109+
{error, invalid_subprotocol} ->
110+
{ok, cowboy_req:reply(400, #{<<"connection">> => <<"close">>},
111+
<<"Unsupported or missing OCPP subprotocol">>, Req), #state{}};
112+
{error, _} ->
113+
{ok, cowboy_req:reply(401,
114+
#{<<"www-authenticate">> => <<"Basic realm=\"OCPP\"">>},
115+
<<"Unauthorized">>, Req), #state{}}
141116
end
142117
end.
143118

@@ -156,8 +131,8 @@ info(Pid, Items) ->
156131
Res.
157132
-spec websocket_init(state()) ->
158133
{cowboy_websocket:commands(), state()} |
159-
{cowboy_websocket:commands(), state(), hibernate}.
160-
websocket_init(State0 = #state{socket = Socket, vhost = Vhost, client_id = ClientId, proto_ver = ProtoVer}) ->
134+
{cowboy_websocket:commands(), state, hibernate}.
135+
websocket_init(State0 = #state{socket = Socket, vhost = Vhost, client_id = ClientId, user = User, proto_ver = ProtoVer}) ->
161136
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [web_ocpp]}),
162137
case rabbit_net:connection_string(Socket, inbound) of
163138
{ok, ConnStr} ->
@@ -166,8 +141,9 @@ websocket_init(State0 = #state{socket = Socket, vhost = Vhost, client_id = Clien
166141
State1 = State0#state{conn_name = ConnName},
167142
State2 = rabbit_event:init_stats_timer(State1, #state.stats_timer),
168143
% Inside `init` of the processor "connection_created" is called for management UI to show the connection
169-
case rabbit_web_ocpp_processor:init(Vhost, ClientId, ProtoVer, rabbit_net:unwrap_socket(Socket),
170-
ConnName, fun send_reply/1) of
144+
case rabbit_web_ocpp_processor:init(Vhost, ClientId, ProtoVer,
145+
rabbit_net:unwrap_socket(Socket),
146+
ConnName, User, fun send_reply/1) of
171147
{ok, ProcState} ->
172148
?LOG_INFO("Accepted Web OCPP connection ~ts for client ID ~ts",
173149
[ConnName, ClientId]),
@@ -321,7 +297,7 @@ websocket_info(Msg, State) ->
321297
{[], State, hibernate}.
322298

323299
terminate(Reason, _Request, #state{conn_name = ConnName,
324-
proc_state = PState, % Can be undefined if init failed
300+
proc_state = PState, % Can be undefined if init crashed
325301
client_id = ClientId} = State) ->
326302
?LOG_INFO("Web OCPP closing connection ~ts for client ID ~p", [ConnName, ClientId]),
327303
maybe_emit_stats(State),
@@ -331,7 +307,12 @@ terminate(Reason, _Request, #state{conn_name = ConnName,
331307
_ ->
332308
Infos = infos(?EVENT_KEYS, State),
333309
rabbit_web_ocpp_processor:terminate(Reason, Infos, PState)
334-
end.
310+
end;
311+
312+
terminate(Reason, _Request, Opts) ->
313+
%% Fallback clause when init crashed before state record was established
314+
?LOG_INFO("Web OCPP closing connection. Reason: ~p Opts: ~p", [Reason, Opts]),
315+
ok.
335316

336317
%% Internal.
337318

@@ -348,6 +329,35 @@ ssl_login_name(Sock) ->
348329
nossl -> none
349330
end.
350331

332+
pick_protocol(Req, ClientId) ->
333+
%% The client MUST include a valid ocpp version in the list of
334+
%% WebSocket Sub Protocols it offers [OCPP 1.6 JSON spec §3.1.2].
335+
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
336+
undefined ->
337+
?LOG_ERROR("Web OCPP: missing subprotocol list for client ~p", [ClientId]),
338+
{error, invalid_subprotocol};
339+
ProtoList ->
340+
case [ {P, ?OCPP_PROTO_TO_ATOM(P)} || P <- ProtoList,
341+
?OCPP_PROTO_TO_ATOM(P) =/= undefined ] of
342+
[] ->
343+
?LOG_ERROR("Web OCPP: no supported ocppX.X subprotocol in ~p for client ~p",
344+
[ProtoList, ClientId]),
345+
{error, invalid_subprotocol};
346+
[{Matched, Ver}|_] ->
347+
{Ver, cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, Matched, Req)}
348+
end
349+
end.
350+
351+
basic_auth_creds(Req) ->
352+
case cowboy_req:header(<<"authorization">>, Req, <<>>) of
353+
<<>> -> {undefined, undefined};
354+
H ->
355+
case cow_http_hd:parse_authorization(H) of
356+
{basic, U, P} -> {U, P};
357+
_ -> {undefined, undefined}
358+
end
359+
end.
360+
351361
check_credentials(Username, Password, SslLoginName, PeerIp) ->
352362
case creds(Username, Password, SslLoginName) of
353363
{ok, _, _} = Ok ->
@@ -369,7 +379,7 @@ check_credentials(Username, Password, SslLoginName, PeerIp) ->
369379
creds(User, Pass, SSLLoginName) ->
370380
CredentialsProvided = User =/= undefined orelse Pass =/= undefined,
371381
ValidCredentials = is_binary(User) andalso is_binary(Pass) andalso Pass =/= <<>>,
372-
{ok, TLSAuth} = application:get_env(?APP_NAME, ssl_cert_login),
382+
TLSAuth = application:get_env(?APP_NAME, ssl_cert_login, false),
373383
SSLLoginProvided = TLSAuth =:= true andalso SSLLoginName =/= none,
374384

375385
case {CredentialsProvided, ValidCredentials, SSLLoginProvided} of
@@ -380,11 +390,11 @@ creds(User, Pass, SSLLoginName) ->
380390
%% Either username or password is provided
381391
{invalid_creds, {User, Pass}};
382392
{false, false, true} ->
383-
%% rabbitmq_mqtt.ssl_cert_login is true. SSL user name provided.
393+
%% rabbitmq_web_ocpp.ssl_cert_login is true. SSL user name provided.
384394
%% Authenticating using username only.
385395
{ok, SSLLoginName, none};
386396
{false, false, false} ->
387-
{ok, AllowAnon} = application:get_env(?APP_NAME, allow_anonymous),
397+
AllowAnon = application:get_env(?APP_NAME, allow_anonymous, false),
388398
case AllowAnon of
389399
true ->
390400
case rabbit_auth_mechanism_anonymous:credentials() of
@@ -400,19 +410,57 @@ creds(User, Pass, SSLLoginName) ->
400410
nocreds
401411
end.
402412

413+
check_vhost_exists(Vhost, UsernameForLog, PeerIp) ->
414+
case rabbit_vhost:exists(Vhost) of
415+
true -> ok;
416+
false ->
417+
?LOG_ERROR("OCPP connection failed: vhost '~s' does not exist", [Vhost]),
418+
auth_attempt_failed(PeerIp, UsernameForLog),
419+
{error, bad_vhost}
420+
end.
421+
422+
check_vhost_alive(Vhost) ->
423+
case rabbit_vhost_sup_sup:is_vhost_alive(Vhost) of
424+
true -> ok;
425+
false ->
426+
?LOG_ERROR("OCPP connection failed: vhost '~s' is down", [Vhost]),
427+
{error, vhost_down}
428+
end.
429+
430+
check_vhost_access(Vhost, User, _ClientId, PeerIp, AuthzCtx) ->
431+
try rabbit_access_control:check_vhost_access(User, Vhost, {ip, PeerIp}, AuthzCtx) of
432+
ok -> ok
433+
catch exit:#amqp_error{name = not_allowed, explanation = Msg} ->
434+
?LOG_ERROR("OCPP vhost access refused for user '~s' to vhost '~s': ~s",
435+
[User#user.username, Vhost, Msg]),
436+
auth_attempt_failed(PeerIp, User#user.username),
437+
{error, access_refused}
438+
end.
439+
440+
check_user_login(Vhost, Username, Password, ClientId, PeerIp) ->
441+
%% For OCPP, Password might be 'none' if using cert auth or no auth
442+
EffectivePassword = case Password of none -> <<>> ; _ -> Password end,
443+
AuthProps = [{vhost, Vhost}, {client_id, ClientId}, {password, EffectivePassword}],
444+
case rabbit_access_control:check_user_login(Username, AuthProps) of
445+
{ok, User = #user{username = RabbitUser}} ->
446+
notify_auth_result(user_authentication_success, RabbitUser, PeerIp),
447+
{ok, User};
448+
{refused, UserForLog, Msg, Args} ->
449+
?LOG_ERROR("OCPP login failed for user '~s': " ++ Msg, [UserForLog | Args]),
450+
notify_auth_result(user_authentication_failure, UserForLog, PeerIp),
451+
auth_attempt_failed(PeerIp, UserForLog),
452+
{error, authentication_failure}
453+
end.
454+
455+
notify_auth_result(Event, Username, PeerIp) ->
456+
rabbit_event:notify(Event, [{name, Username}, {peer_id, PeerIp},
457+
{connection_type, network}, {protocol, ocpp}]).
458+
403459
-spec auth_attempt_failed(inet:ip_address(), binary()) -> ok.
404460
auth_attempt_failed(PeerIp, Username) ->
405461
rabbit_core_metrics:auth_attempt_failed(PeerIp, Username, ocpp),
406462
timer:sleep(?SILENT_CLOSE_DELAY).
407463

408-
no_supported_sub_protocol(Protocol, ClientId, Req) ->
409-
%% The client MUST include a valid ocpp version in the list of
410-
%% WebSocket Sub Protocols it offers [OCPP 1.6 JSON spec §3.1.2].
411-
?LOG_ERROR("Web OCPP: Invalid 'ocppX.X' version included in client (~p) offered subprotocols: ~tp", [ClientId, Protocol]),
412-
{ok,
413-
cowboy_req:reply(400, #{<<"connection">> => <<"close">>}, Req),
414-
#state{}}.
415-
416464
%% Allow DISCONNECT packet to be sent to client before closing the connection.
417465
defer_close(CloseStatusCode) ->
418466
self() ! {stop, CloseStatusCode, server_initiated_disconnect},

0 commit comments

Comments
 (0)