Skip to content

Update protocol version 5 requestforward get async reply and set async #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/ofp_v4.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
-define(OFPCML_MAX, 16#ffe5). %% buffer id
-define(OFPCML_NO_BUFFER, 16#ffff). %% buffer id
-define(OFPM_MAX, 16#ffff0000). %% flow meter number
-define(OFPM_MAX_SIZE,16#10000). %% Max multipart message size

%% Message sizes (in bytes) ----------------------------------------------------

Expand Down
53 changes: 41 additions & 12 deletions include/ofp_v5.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
-define(OFPCML_MAX, 16#ffe5). %% buffer id
-define(OFPCML_NO_BUFFER, 16#ffff). %% buffer id
-define(OFPM_MAX, 16#ffff0000). %% flow meter number
-define(OFPM_MAX_SIZE,16#10000). %% Max multipart message size

%% Message sizes (in bytes) ----------------------------------------------------

Expand Down Expand Up @@ -1550,6 +1551,9 @@
%%% Request Forward Message (version 1.4.0, section 7.4.6)
%%%-----------------------------------------------------------------------------

-type ofp_requestforward_reason() :: group_mod
| meter_mod.

-record(ofp_requestforward, {
request :: ofp_message()
}).
Expand All @@ -1563,25 +1567,50 @@
-type ofp_get_async_request() :: #ofp_get_async_request{}.

-record(ofp_get_async_reply, {
packet_in_mask = {[], []} :: {[ofp_packet_in_reason()],
[ofp_packet_in_reason()]},
port_status_mask = {[], []} :: {[ofp_port_status_reason()],
[ofp_port_status_reason()]},
flow_removed_mask = {[], []} :: {[ofp_flow_removed_reason()],
[ofp_flow_removed_reason()]}
properties = [] :: [ofp_async_config_property()]
}).
-type ofp_get_async_reply() :: #ofp_get_async_reply{}.

-record(ofp_set_async, {
packet_in_mask = {[], []} :: {[ofp_packet_in_reason()],
[ofp_packet_in_reason()]},
port_status_mask = {[], []} :: {[ofp_port_status_reason()],
[ofp_port_status_reason()]},
flow_removed_mask = {[], []} :: {[ofp_flow_removed_reason()],
[ofp_flow_removed_reason()]}
properties = [] :: [ofp_async_config_property()]
}).
-type ofp_set_async() :: #ofp_set_async{}.

-type ofp_async_config_prop_type() :: packet_in_slave
| packet_in_master
| port_status_slave
| port_status_master
| flow_removed_slave
| flow_removed_master
| role_status_slave
| role_status_master
| table_status_slave
| table_status_master
| requestforward_slave
| requestforward_master
| experimenter_slave
| experimenter_master.

-record(ofp_async_config_prop_reasons, {
type :: ofp_async_config_prop_type(),
mask = [] :: [ofp_packet_in_reason()]
| [ofp_port_status_reason()]
| [ofp_flow_removed_reason()]
| [ofp_controller_role_reason()]
| [ofp_table_reason()]
| [ofp_requestforward_reason()]
}).

-record(ofp_async_config_prop_experimenter, {
type :: ofp_async_config_prop_type(),
experimenter :: integer(),
exp_type :: integer(),
data = <<>> :: binary()
}).

-type ofp_async_config_property() :: #ofp_async_config_prop_reasons{}
| #ofp_async_config_prop_experimenter{}.

%%%-----------------------------------------------------------------------------
%%% Bundle Messages (version 1.4.0, section 7.3.9)
%%%-----------------------------------------------------------------------------
Expand Down
27 changes: 24 additions & 3 deletions src/ofp_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,12 @@ handle_send(#ofp_message{type = packet_in} = Message,
_Else ->
do_filter_send(Message, State)
end;
handle_send(#ofp_message{type = multipart_reply} = Message,
#state{version = Version} = State) ->
handle_send(#ofp_message{type = Type} = Message,
#state{version = Version} = State) when Type =:= multipart_reply ->
Module = client_module(Version),
Replies = Module:split_multipart(Message),
Results = [do_send(Reply, State) || Reply <- Replies],
case lists:all(fun(X) -> X == ok end, Results) of
case lists:all(fun(X) -> X == ok end, lists:flatten(Results) ) of
true ->
ok;
false ->
Expand All @@ -395,6 +395,27 @@ handle_send(#ofp_message{type = multipart_reply} = Message,
handle_send(Message, State) ->
do_filter_send(Message, State).

do_send(#ofp_message{ type = Type } = Message, #state{controller = {_, _, Proto},
socket = Socket,
parser = Parser,
version = Version} = State) when Type =:= multipart_reply ->
case ofp_parser:encode(Parser, Message#ofp_message{version = Version}) of
{ok, Binary} ->
case byte_size(Binary) < (1 bsl 16) of
true ->
send(Proto, Socket, Binary);
false ->
Module = client_module(Version),
case Module:split_big_multipart(Message) of
false ->
{error, message_too_big};
SplitList ->
lists:map(fun(Msg) -> do_send(Msg,State) end, SplitList)
end
end;
{error, Reason} ->
{error, Reason}
end;
do_send(Message, #state{controller = {_, _, Proto},
socket = Socket,
parser = Parser,
Expand Down
96 changes: 94 additions & 2 deletions src/ofp_client_v4.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
filter_out_message/3,
type_atom/1,
add_aux_id/2,
split_multipart/1]).
split_multipart/1,
split_big_multipart/1
]).

-include("of_protocol.hrl").
-include("ofp_v4.hrl").
Expand Down Expand Up @@ -261,5 +263,95 @@ split2(_, [], Head) ->
split2(N, [X | Tail], Head) ->
split2(N - 1, Tail, [X | Head]).

%% False inner boddies, cant be split into reliable complete messages.
multipart_inner_body(#ofp_desc_request {} = _Msg) -> false;
multipart_inner_body(#ofp_desc_reply {} = _Msg) -> false;
multipart_inner_body(#ofp_flow_stats_request {} = _Msg) -> false;
multipart_inner_body(#ofp_flow_stats_reply { body = InnerBody } = _Msg)-> {body,InnerBody};
multipart_inner_body(#ofp_aggregate_stats_request{} = _Msg) -> false;
multipart_inner_body(#ofp_aggregate_stats_reply {} = _Msg) -> false;
multipart_inner_body(#ofp_table_stats_request {} = _Msg) -> false;
multipart_inner_body(#ofp_table_stats_reply { body = InnerBody } = _Msg)-> {body,InnerBody};
multipart_inner_body(#ofp_table_features_request {} = _Msg) -> false;
multipart_inner_body(#ofp_table_features_reply {} = _Msg) -> false;
multipart_inner_body(#ofp_port_stats_request {} = _Msg) -> false;
multipart_inner_body(#ofp_port_stats_reply { body = InnerBody } = _Msg)-> {body,InnerBody};
multipart_inner_body(#ofp_port_desc_request {} = _Msg) -> false;
multipart_inner_body(#ofp_port_desc_reply { body = InnerBody } = _Msg)-> {body,InnerBody};
multipart_inner_body(#ofp_queue_stats_request {} = _Msg) -> false;
multipart_inner_body(#ofp_queue_stats_reply { body = InnerBody } = _Msg)-> {body,InnerBody};
multipart_inner_body(#ofp_group_stats_request {} = _Msg) -> false;
multipart_inner_body(#ofp_group_stats_reply { body = InnerBody } = _Msg)-> {body,InnerBody};
multipart_inner_body(#ofp_group_desc_request {} = _Msg) -> false;
multipart_inner_body(#ofp_group_desc_reply { body = InnerBody } = _Msg)-> {body,InnerBody};
multipart_inner_body(#ofp_group_features_request {} = _Msg) -> false;
multipart_inner_body(#ofp_group_features_reply {} = _Msg) -> false;
multipart_inner_body(#ofp_meter_stats_request {} = _Msg) -> false;
multipart_inner_body(#ofp_meter_stats_reply { body = InnerBody } = _Msg)-> {body,InnerBody};
multipart_inner_body(#ofp_meter_config_request {} = _Msg) -> false;
multipart_inner_body(#ofp_meter_config_reply { body = InnerBody } = _Msg)-> {body,InnerBody};
multipart_inner_body(#ofp_meter_features_request {} = _Msg) -> false;
multipart_inner_body(#ofp_meter_features_reply {} = _Msg) -> false;
multipart_inner_body(#ofp_experimenter_request {} = _Msg) -> false;
multipart_inner_body(#ofp_experimenter_reply {} = _Msg) -> false.

remove_inner_body(#ofp_flow_stats_reply {} = Body) -> Body#ofp_flow_stats_reply { body = [] };
remove_inner_body(#ofp_table_stats_reply {} = Body) -> Body#ofp_table_stats_reply { body = [] };
remove_inner_body(#ofp_port_stats_reply {} = Body) -> Body#ofp_port_stats_reply { body = [] };
remove_inner_body(#ofp_port_desc_reply {} = Body) -> Body#ofp_port_desc_reply { body = [] };
remove_inner_body(#ofp_queue_stats_reply {} = Body) -> Body#ofp_queue_stats_reply { body = [] };
remove_inner_body(#ofp_group_stats_reply {} = Body) -> Body#ofp_group_stats_reply { body = [] };
remove_inner_body(#ofp_group_desc_reply {} = Body) -> Body#ofp_group_desc_reply { body = [] };
remove_inner_body(#ofp_meter_stats_reply {} = Body) -> Body#ofp_meter_stats_reply { body = [] };
remove_inner_body(#ofp_meter_config_reply {} = Body) -> Body#ofp_meter_config_reply { body = [] }.

reasemble_inner_body(#ofp_flow_stats_reply {} = Body,InnerBody) -> Body#ofp_flow_stats_reply { body = InnerBody };
reasemble_inner_body(#ofp_table_stats_reply {} = Body,InnerBody) -> Body#ofp_table_stats_reply { body = InnerBody };
reasemble_inner_body(#ofp_port_stats_reply {} = Body,InnerBody) -> Body#ofp_port_stats_reply { body = InnerBody };
reasemble_inner_body(#ofp_port_desc_reply {} = Body,InnerBody) -> Body#ofp_port_desc_reply { body = InnerBody };
reasemble_inner_body(#ofp_queue_stats_reply {} = Body,InnerBody) -> Body#ofp_queue_stats_reply { body = InnerBody };
reasemble_inner_body(#ofp_group_stats_reply {} = Body,InnerBody) -> Body#ofp_group_stats_reply { body = InnerBody };
reasemble_inner_body(#ofp_group_desc_reply {} = Body,InnerBody) -> Body#ofp_group_desc_reply { body = InnerBody };
reasemble_inner_body(#ofp_meter_stats_reply {} = Body,InnerBody) -> Body#ofp_meter_stats_reply { body = InnerBody };
reasemble_inner_body(#ofp_meter_config_reply {} = Body,InnerBody) -> Body#ofp_meter_config_reply { body = InnerBody }.

%% SPLIT THE MESSAGE INTO 2 bits, the Body and the rest,
%% Encode and parse the REST, and use that size as the
%% Deduction for the maximum size.... ( - 16 )

split_big_multipart(#ofp_message{type = _Type, version = V, body = Body} = Message) ->
case multipart_inner_body(Body) of
false ->
false;
{body,InnerBody} ->
SkeletonBody = Message#ofp_message{ body = remove_inner_body(Body) },
[ Message#ofp_message{body = reasemble_inner_body(Body,Chunk) } ||
Chunk <- split_big_multipart_structs(SkeletonBody,InnerBody,[],V) ]
end.

split_big_multipart_structs(SkeletonBody,Body,Chunks,Version) ->
Module = encode_module(Version),
case split_structs_chunks(Body,[],0,Module) of
{ok,{[],Chunk}} -> [Chunk|Chunks];
{ok,{Rest,Chunk}} -> split_big_multipart_structs(SkeletonBody,Rest,[Chunk|Chunks],Version)
end.

split_structs_chunks([],Results,_TotalSize,_Module) ->
{ok,{[],Results}};
split_structs_chunks([H|T],Results,TotalSize,Module) ->
Bin = Module:encode_struct(H),
NewTotalSize = byte_size(Bin) + TotalSize,
case NewTotalSize < ( ?OFPM_MAX_SIZE - 16 ) of %% 16 bytes for the ofp_message bin encasing the actual multipart message etc
true -> split_structs_chunks(T,[H|Results],NewTotalSize,Module);
false -> {ok,{[H|T],Results}}
end.

encode_module(3) ->
ofp_v3_encode;
encode_module(4) ->
ofp_v4_encode;
encode_module(5) ->
ofp_v5_encode.

should_filter_out(Reason, Filter) ->
not lists:member(Reason, Filter).
not lists:member(Reason, Filter).
74 changes: 41 additions & 33 deletions src/ofp_client_v5.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
create_role/2,
extract_role/1,
role_status/3,
create_async/1,
extract_async/1,
filter_out_message/3,
type_atom/1,
add_aux_id/2,
split_multipart/1]).
split_multipart/1,
split_big_multipart/1
]).

-include("of_protocol.hrl").
-include("ofp_v5.hrl").
Expand Down Expand Up @@ -59,36 +59,6 @@ role_status(Role, Reason, GenId) ->
reason = Reason,
generation_id = GenId}.

%% @doc Create async filters message.
-spec create_async(#async_config{}) -> #ofp_get_async_reply{}.
create_async(#async_config{
master_equal_packet_in = MEP,
master_equal_port_status = MES,
master_equal_flow_removed = MEF,
slave_packet_in = SP,
slave_port_status = SS,
slave_flow_removed = SF}) ->
%% Ensure that we don't try to send v4 values
MEP5 = MEP -- [no_match, action],
SP5 = SP -- [no_match, action],
#ofp_get_async_reply{packet_in_mask = {MEP5, SP5},
port_status_mask = {MES, SS},
flow_removed_mask = {MEF, SF}}.

%% @doc Extract async filters information.
-spec extract_async(#ofp_set_async{}) -> #async_config{}.
extract_async(#ofp_set_async{packet_in_mask = {MEP, SP},
port_status_mask = {MES, SS},
flow_removed_mask = {MEF, SF}}) ->
#async_config{
master_equal_packet_in = MEP,
master_equal_port_status = MES,
master_equal_flow_removed = MEF,
slave_packet_in = SP,
slave_port_status = SS,
slave_flow_removed = SF
}.

-spec filter_out_message(#ofp_message{},
master | slave | equal,
#async_config{}) -> boolean().
Expand Down Expand Up @@ -278,5 +248,43 @@ split2(_, [], Head) ->
split2(N, [X | Tail], Head) ->
split2(N - 1, Tail, [X | Head]).


split_big_multipart(#ofp_message{ version = Version,
body = #ofp_port_stats_reply{body = MultipartReplyBody}
} = Message) ->
Chunks = split_big_multipart_structs(MultipartReplyBody,[],Version),
[ Message#ofp_message{body = #ofp_port_stats_reply{body = Chunk} } || Chunk <- Chunks ];
split_big_multipart(#ofp_message{ version = _Version,
body = _Body
} = Message) ->
[Message].

split_big_multipart_structs(Body,Chunks,Version) ->
Module = encode_module(Version),
case split_structs_chunks(Body,[],0,Module) of
{ok,{[],Chunk}} ->
[Chunk|Chunks];
{ok,{Rest,Chunk}} ->
split_big_multipart_structs(Rest,[Chunk|Chunks],Version)
end.

split_structs_chunks([],Results,_TotalSize,_Module) ->
{ok,{[],Results}};
split_structs_chunks([H|T],Results,TotalSize,Module) ->
Bin = Module:encode_struct(H),
NewTotalSize = byte_size(Bin) + TotalSize,
case NewTotalSize < ( ?OFPM_MAX_SIZE - 8 ) of
true -> split_structs_chunks(T,[H|Results],NewTotalSize,Module);
false -> {ok,{[H|T],Results}} %% Give back REST ( Remainder=[H|T] ) as starting point, for next itteration.
end.

encode_module(3) ->
ofp_v3_encode;
encode_module(4) ->
ofp_v4_encode;
encode_module(5) ->
ofp_v5_encode.


should_filter_out(Reason, Filter) ->
not lists:member(Reason, Filter).
3 changes: 2 additions & 1 deletion src/ofp_v4_encode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
%% @private
-module(ofp_v4_encode).

-export([do/1]).
-export([do/1,
encode_struct/1]).

-include("of_protocol.hrl").
-include("ofp_v4.hrl").
Expand Down
Loading