Skip to content

Commit 02ac19c

Browse files
committed
add ocpp processor and more ocpp state items
1 parent c0736b0 commit 02ac19c

File tree

5 files changed

+1437
-118
lines changed

5 files changed

+1437
-118
lines changed

include/rabbit_web_ocpp.hrl

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,25 @@
55
%% Copyright (c) 2025 VAMPIRE BYTE SRL. All Rights Reserved.
66
%%
77

8+
-define(APP_NAME, rabbitmq_web_ocpp).
9+
-define(PG_SCOPE, pg_scope_rabbitmq_web_ocpp_clientid).
10+
-define(DEFAULT_IDLE_TIMEOUT_MS, 600_000). %% 10 minutes
11+
812
-type option(T) :: undefined | T.
13+
-type client_id() :: binary().
14+
-type user_property() :: [{binary(), binary()}].
15+
16+
%% Close frame status codes as defined in
17+
%% https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1
18+
-define(CLOSE_NORMAL, 1000).
19+
-define(CLOSE_SERVER_GOING_DOWN, 1001).
20+
-define(CLOSE_PROTOCOL_ERROR, 1002).
21+
-define(CLOSE_UNACCEPTABLE_DATA_TYPE, 1003).
22+
-define(CLOSE_INVALID_PAYLOAD, 1007).
23+
-define(CLOSE_POLICY_VIOLATION, 1008). % e.g., unsupported subprotocol
924

1025
%% WebSocket Subprotocol Name Registry
1126
%% https://www.iana.org/assignments/websocket/websocket.xml
12-
-define(OCPP_SUPPORTED_PROTOCOLS, [<<"ocpp1.2">>, <<"ocpp1.5">>, <<"ocpp1.6">>, <<"ocpp2.0">>, <<"ocpp2.0.1">>, <<"ocpp2.1">>]).
13-
1427
-define(OCPP_PROTO_V12, ocpp12).
1528
-define(OCPP_PROTO_V15, ocpp15).
1629
-define(OCPP_PROTO_V16, ocpp16).
@@ -26,12 +39,36 @@
2639
| ?OCPP_PROTO_V201
2740
| ?OCPP_PROTO_V21.
2841

42+
-define(OCPP_PROTO_TO_ATOM(Proto), case Proto of
43+
<<"ocpp1.2">> -> ?OCPP_PROTO_V12;
44+
<<"ocpp1.5">> -> ?OCPP_PROTO_V15;
45+
<<"ocpp1.6">> -> ?OCPP_PROTO_V16;
46+
<<"ocpp2.0">> -> ?OCPP_PROTO_V20;
47+
<<"ocpp2.0.1">> -> ?OCPP_PROTO_V201;
48+
<<"ocpp2.1">> -> ?OCPP_PROTO_V21;
49+
_ -> undefined
50+
end).
51+
52+
-define(OCPP_TCP_PROTOCOL, 'ws/ocpp').
53+
-define(OCPP_TLS_PROTOCOL, 'wss/ocpp').
54+
2955
-define(OCPP_MESSAGE_TYPE_CALL, 2). % Request
3056
-define(OCPP_MESSAGE_TYPE_CALLRESULT, 3). % Response success
3157
-define(OCPP_MESSAGE_TYPE_CALLERROR, 4). % Response error
3258
-define(OCPP_MESSAGE_TYPE_CALLRESULTERROR, 5). % OCPP 2.1 only
3359
-define(OCPP_MESSAGE_TYPE_SEND, 6). % OCPP 2.1 only
3460

61+
%% Internal representation of an OCPP message
62+
-record(ocpp_msg, {
63+
msg_type :: integer(), % Extracted MessageTypeID
64+
msg_id :: binary(), % Extracted MessageID (as binary string)
65+
action :: binary() | undefined, % Extracted Action for CALL/SEND types
66+
payload :: iolist(), % Original JSON payload
67+
client_id :: binary() % Originating CP identifier (e.g., from reply_to)
68+
}).
69+
70+
-type ocpp_msg() :: #ocpp_msg{}.
71+
3572
-define(ITEMS,
3673
[pid,
3774
protocol,

src/mc_ocpp.erl

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2025 VAMPIRE BYTE SRL. All Rights Reserved.
6+
%%
7+
-module(mc_ocpp).
8+
-behaviour(mc).
9+
10+
-export([
11+
init/1,
12+
size/1,
13+
x_header/2,
14+
property/2,
15+
routing_headers/2,
16+
convert_from/3,
17+
convert_to/3,
18+
protocol_state/2,
19+
prepare/2
20+
]).
21+
22+
-include_lib("kernel/include/logger.hrl").
23+
-include_lib("rabbit_common/include/rabbit_framing.hrl"). % #content{}
24+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
25+
-include_lib("rabbit_common/include/rabbit.hrl"). % #'P_basic'{}
26+
-include_lib("rabbit/include/mc.hrl").
27+
-include("rabbit_web_ocpp.hrl").
28+
29+
-define(CONTENT_TYPE_JSON, <<"application/json">>).
30+
31+
%%--------------------------------------------------------------------
32+
-spec init(#ocpp_msg{}) -> {#ocpp_msg{}, map()} | error.
33+
init(Msg = #ocpp_msg{}) ->
34+
Anns = #{
35+
?ANN_ROUTING_KEYS => [Msg#ocpp_msg.client_id],
36+
?ANN_DURABLE => false,
37+
correlation_id => Msg#ocpp_msg.msg_id,
38+
reply_to => Msg#ocpp_msg.client_id,
39+
content_type => ?CONTENT_TYPE_JSON
40+
},
41+
{Msg, Anns};
42+
init(Other) ->
43+
?LOG_ERROR("mc_ocpp:init badarg: ~p", [Other]),
44+
error(badarg).
45+
46+
%%--------------------------------------------------------------------
47+
-spec size(#ocpp_msg{}) -> {non_neg_integer(), non_neg_integer()}.
48+
size(#ocpp_msg{payload = P}) when is_binary(P) ->
49+
{0, byte_size(P)};
50+
size(#ocpp_msg{payload = Iol}) ->
51+
{0, iolist_size(Iol)}.
52+
53+
%%--------------------------------------------------------------------
54+
x_header(_Key, #ocpp_msg{}) ->
55+
undefined.
56+
57+
%%--------------------------------------------------------------------
58+
property(correlation_id, #ocpp_msg{msg_id = ID}) when is_binary(ID) ->
59+
{binary, ID};
60+
property(reply_to, #ocpp_msg{client_id = C}) when is_binary(C) ->
61+
{binary, C};
62+
property(_, _) ->
63+
undefined.
64+
65+
%%--------------------------------------------------------------------
66+
routing_headers(#ocpp_msg{action = A}, _) when is_binary(A) ->
67+
#{<<"ocpp_action">> => A};
68+
routing_headers(_, _) ->
69+
#{}.
70+
71+
%%--------------------------------------------------------------------
72+
-spec convert_from(atom(), term(), map()) -> #ocpp_msg{} | not_implemented.
73+
74+
%% AMQP 1.0
75+
convert_from(mc_amqp, Sections, _Env) ->
76+
{Payload, Corr, Act, Rto} = extract_amqp1(Sections),
77+
build_ocpp(Payload, Corr, Act, Rto);
78+
79+
%% AMQP 0-9-1
80+
convert_from(mc_amqpl, #content{payload_fragments_rev = Rev, properties = BP}, _Env) ->
81+
Payload = iolist_to_binary(lists:reverse(Rev)),
82+
Corr = case BP#'P_basic'.correlation_id of undefined -> undefined; CorrVal -> CorrVal end,
83+
Act = case BP#'P_basic'.type of
84+
undefined -> undefined;
85+
ActVal -> ActVal
86+
end,
87+
Rto = case BP#'P_basic'.reply_to of undefined -> undefined; RtoVal -> RtoVal end,
88+
build_ocpp(Payload, Corr, Act, Rto);
89+
90+
%% Identity / No conversion
91+
convert_from(?MODULE, Msg, _) ->
92+
Msg;
93+
convert_from(_, _, _) ->
94+
not_implemented.
95+
96+
%%--------------------------------------------------------------------
97+
-spec convert_to(atom(), #ocpp_msg{}, map()) -> term() | not_implemented.
98+
99+
%% AMQP 1.0
100+
convert_to(mc_amqp, #ocpp_msg{payload=P, msg_id=ID, action=A, client_id=CID} = _Msg, Env) ->
101+
Header = #'v1_0.header'{durable = false},
102+
Props = #'v1_0.properties'{
103+
correlation_id = ID,
104+
subject = A,
105+
content_type = {symbol, ?CONTENT_TYPE_JSON},
106+
reply_to = CID
107+
},
108+
Data = #'v1_0.data'{content = P},
109+
Sections = [Header, Props, Data],
110+
%% Use convert_from/3 here to turn a section list into
111+
%% a canonical AMQP message that the server knows how to frame.
112+
mc_amqp:convert_from(mc_amqp, Sections, Env);
113+
114+
%% AMQP 0-9-1
115+
convert_to(mc_amqpl,
116+
#ocpp_msg{payload = Payload,
117+
msg_id = MsgId,
118+
action = Action,
119+
client_id= ClientId},
120+
_Env) ->
121+
%% 1) Build the basic properties record
122+
BP = #'P_basic'{
123+
delivery_mode = 1, % non-persistent
124+
correlation_id = MsgId, % your OCPP msg_id
125+
type = Action, % maps to 'type' header
126+
content_type = ?CONTENT_TYPE_JSON,
127+
reply_to = ClientId, % OCPP client_id
128+
headers = undefined % no extra field-table entries
129+
},
130+
131+
%% 2) Wrap the JSON payload as a single binary
132+
PFR = [ iolist_to_binary(Payload) ],
133+
134+
%% 3) Use the Basic class ID (60) for content frames
135+
#content{
136+
class_id = 60,
137+
properties = BP,
138+
properties_bin = none,
139+
payload_fragments_rev = PFR
140+
};
141+
142+
%% Identity / No conversion
143+
convert_to(?MODULE, Msg, _) ->
144+
Msg;
145+
convert_to(_, _, _) ->
146+
not_implemented.
147+
148+
%%--------------------------------------------------------------------
149+
%% Helpers
150+
%%--------------------------------------------------------------------
151+
152+
%% @doc No‐op before sending—ensure payload is a binary
153+
-spec prepare(Atom :: atom(), Msg :: #ocpp_msg{}) -> #ocpp_msg{}.
154+
prepare(_For, Msg = #ocpp_msg{payload = P}) when is_binary(P) ->
155+
Msg;
156+
prepare(_For, Msg = #ocpp_msg{payload = Iolist}) ->
157+
%% convert any stray iolists to a flat binary
158+
Msg#ocpp_msg{payload = iolist_to_binary(Iolist)}.
159+
160+
%% @doc No protocol‐specific state changes needed
161+
-spec protocol_state(Msg :: #ocpp_msg{}, Anns :: map()) -> #ocpp_msg{}.
162+
protocol_state(Msg, _Anns) ->
163+
Msg.
164+
165+
-spec extract_amqp1([term()]) -> {binary(), binary()|undefined, binary()|undefined, binary()|undefined}.
166+
extract_amqp1(Sections) ->
167+
{Rev, Corr, Act, Rto} =
168+
lists:foldl(fun
169+
(#'v1_0.data'{content=C}, {R,C0,A0,R0}) -> {[C|R], C0, A0, R0};
170+
(#'v1_0.properties'{correlation_id={binary,ID}}, {R,_,A0,R0}) -> {R, ID, A0, R0};
171+
(#'v1_0.properties'{subject={utf8,S}}, {R,C0,_,R0}) -> {R, C0, S, R0};
172+
(#'v1_0.properties'{reply_to={binary,RT}}, {R,C0,A0,_}) -> {R, C0, A0, RT};
173+
(_, Acc) -> Acc
174+
end, {[], undefined, undefined, undefined}, Sections),
175+
{ iolist_to_binary(lists:reverse(Rev))
176+
, Corr, Act, Rto
177+
}.
178+
179+
-spec build_ocpp(binary(), binary()|undefined, binary()|undefined, binary()|undefined) -> #ocpp_msg{}.
180+
build_ocpp(Payload, Corr, Act, Rto) ->
181+
#ocpp_msg{
182+
payload = Payload,
183+
msg_type = undefined,
184+
msg_id = Corr,
185+
action = Act,
186+
client_id = Rto
187+
}.

src/rabbit_web_ocpp_app.erl

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
-module(rabbit_web_ocpp_app).
99

1010
-behaviour(application).
11+
12+
-include_lib("rabbit_common/include/rabbit.hrl").
13+
-include("rabbit_web_ocpp.hrl").
14+
1115
-export([
1216
start/2,
1317
prep_stop/1,
@@ -24,9 +28,6 @@
2428

2529
-import(rabbit_misc, [pget/2]).
2630

27-
-define(TCP_PROTOCOL, 'http/web-ocpp').
28-
-define(TLS_PROTOCOL, 'https/web-ocpp').
29-
3031
%%
3132
%% API
3233
%%
@@ -42,16 +43,33 @@ prep_stop(State) ->
4243

4344
-spec stop(_) -> ok.
4445
stop(_State) ->
45-
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL),
46-
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL),
46+
_ = rabbit_networking:stop_ranch_listener_of_protocol(?OCPP_TCP_PROTOCOL),
47+
_ = rabbit_networking:stop_ranch_listener_of_protocol(?OCPP_TLS_PROTOCOL),
4748
ok.
4849

49-
init([]) -> {ok, {{one_for_one, 1, 5}, []}}.
50+
init([]) ->
51+
%% Use separate process group scope per RabbitMQ node. This achieves a local-only
52+
%% process group which requires less memory with millions of connections.
53+
PgScope = rabbit:pg_local_scope(?PG_SCOPE),
54+
persistent_term:put(?PG_SCOPE, PgScope),
55+
56+
%% Define the children for the supervision tree
57+
Children = [
58+
#{id => PgScope,
59+
start => {pg, start_link, [PgScope]},
60+
restart => transient,
61+
shutdown => ?WORKER_WAIT,
62+
type => worker,
63+
modules => [pg]}
64+
],
65+
66+
%% Return the supervision strategy and children
67+
{ok, {{one_for_one, 1, 5}, Children}}.
5068

5169
-spec list_connections() -> [pid()].
5270
list_connections() ->
53-
PlainPids = rabbit_networking:list_local_connections_of_protocol(?TCP_PROTOCOL),
54-
TLSPids = rabbit_networking:list_local_connections_of_protocol(?TLS_PROTOCOL),
71+
PlainPids = rabbit_networking:list_local_connections_of_protocol(?OCPP_TCP_PROTOCOL),
72+
TLSPids = rabbit_networking:list_local_connections_of_protocol(?OCPP_TLS_PROTOCOL),
5573
PlainPids ++ TLSPids.
5674

5775
-spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term().
@@ -80,13 +98,14 @@ emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
8098
ocpp_init() ->
8199
CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])),
82100
CowboyWsOpts = maps:from_list(get_env(cowboy_ws_opts, [])),
101+
rabbit_log:info("OCPP Cowboy options: ~p", [CowboyWsOpts]),
83102
TcpConfig = get_env(tcp_config, []),
84103
SslConfig = get_env(ssl_config, []),
85104
%% To derive its connection URL, the Charge Point modifies the OCPP-J endpoint URL by appending to the
86105
%% path first a '/' (U+002F SOLIDUS) and then a string uniquely identifying the Charge Point.
87106
%% This uniquely identifying string has to be percent-encoded as necessary as described in [RFC3986].
88107
%% [OCPP 1.6 JSON spec §3.1.1].
89-
FullPath = get_env(ws_path, "/ocpp") ++ "/:client_id",
108+
FullPath = get_env(ws_path, "/ocpp") ++ "/:vhost/:client_id",
90109
Routes = cowboy_router:compile([{'_', [
91110
{FullPath, rabbit_web_ocpp_handler, [{ws_opts, CowboyWsOpts}]}
92111
]}]),
@@ -120,7 +139,7 @@ start_tcp_listener(TCPConf0, CowboyOpts) ->
120139
[ErrTCP, TCPConf]),
121140
throw(ErrTCP)
122141
end,
123-
listener_started(?TCP_PROTOCOL, TCPConf),
142+
listener_started(?OCPP_TCP_PROTOCOL, TCPConf),
124143
rabbit_log:info("rabbit_web_ocpp: listening for HTTP connections on ~s:~w",
125144
[IpStr, Port]).
126145

@@ -148,7 +167,7 @@ start_tls_listener(TLSConf0, CowboyOpts) ->
148167
[ErrTLS, TLSConf]),
149168
throw(ErrTLS)
150169
end,
151-
listener_started(?TLS_PROTOCOL, TLSConf),
170+
listener_started(?OCPP_TLS_PROTOCOL, TLSConf),
152171
rabbit_log:info("rabbit_web_ocpp: listening for HTTPS connections on ~s:~w",
153172
[TLSIpStr, TLSPort]).
154173

@@ -201,4 +220,4 @@ get_max_connections() ->
201220
get_env(max_connections, infinity).
202221

203222
get_env(Key, Default) ->
204-
rabbit_misc:get_env(rabbitmq_web_ocpp, Key, Default).
223+
rabbit_misc:get_env(?APP_NAME, Key, Default).

0 commit comments

Comments
 (0)