Skip to content
Open

Sn3 #11

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions src/loom_ofdp_recv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

-record(state, {pid, console, parent, listener, sender, socket, address, port, sup, parser, message_cache, subscribers}).

-record(cache, {features_reply, echo_reply, get_config_reply, desc_reply, flow_stats_reply,
-record(cache, {hello, features_reply, echo_reply, get_config_reply, desc_reply, flow_stats_reply,
aggregate_stats_reply, table_stats_reply, port_stats_reply, queue_stats_reply,
group_stats_reply, group_desc_reply, group_features_reply, meter_features_reply,
meter_config_reply, table_features_reply, port_desc_reply, get_async_reply, packetin::{[], []}}).
Expand Down Expand Up @@ -152,9 +152,9 @@ recv(State) ->
lager:info("Waiting for data on: ~p", [Socket]),
receive
{tcp, Socket, Data} ->
lager:info("Received TCP data from ~p", [Socket]),
% lager:info("Received TCP data from ~p", [Socket]),
{ok, NewParser, Messages} = ofp_parser:parse(Parser,Data),
NewMessageCache = process_messages(Messages, MessageCache, Socket, Subscribers),
NewMessageCache = process_messages(Messages, MessageCache, Socket, Subscribers, Data),
inet:setopts(Socket,[{active, once}]),
recv(State#state{parser = NewParser, message_cache = NewMessageCache});
{tcp_closed, Socket} ->
Expand All @@ -181,88 +181,94 @@ recv(State) ->
end.

%%% [SN] process, cache and display OF message received by controller
process_messages([], MessageCache, _, _) ->
process_messages([], MessageCache, _, _, _) ->
MessageCache;
process_messages(Messages, MessageCache, Socket, Subscribers) ->
process_messages(Messages, MessageCache, Socket, Subscribers, Data) ->
[Message|Rest] = Messages,
NewMessageCache = process_message(Message, MessageCache, Socket,Subscribers),
process_messages(Rest, NewMessageCache, Socket,Subscribers).

NewMessageCache = process_message(Message, MessageCache, Socket,Subscribers, Data),
process_messages(Rest, NewMessageCache, Socket,Subscribers, Data).

process_message(#ofp_message{body = #ofp_echo_reply{data = Data}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received ofp_echo_reply from ~p: Data = ~p ~n",[Socket, Data]),
MessageCache#cache{echo_reply = Message};
process_message(#ofp_message{body = #ofp_hello{elements = Elements}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received ofp_hello from ~p: Elements = ~p ~n",[Socket, Elements]),
MessageCache#cache{hello = Message};
process_message(#ofp_message{body = #ofp_features_reply
{datapath_mac= DatapathMac, datapath_id = DatapathId}} =Message, MessageCache, Socket, _Subscribers) ->
{datapath_mac= DatapathMac, datapath_id = DatapathId}} =Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received ofp_features_reply from ~p: DatapathMac = ~p, Datapath_id = ~p ~n",
[Socket, DatapathMac, DatapathId]),
MessageCache#cache{features_reply = Message};
process_message(#ofp_message{body = #ofp_get_config_reply
{flags = Flags, miss_send_len = MissSendLen}} =Message, MessageCache, Socket, _Subscribers) ->
{flags = Flags, miss_send_len = MissSendLen}} =Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received ofp_get_config_reply from ~p: Flags = ~p, MissSendLen = ~p ~n",
[Socket, Flags, MissSendLen]),
MessageCache#cache{get_config_reply = Message};
process_message(#ofp_message{body = #ofp_desc_reply
{flags = Flags, mfr_desc = MfrDesc, hw_desc = HwDesc, sw_desc = SwDesc,
serial_num = SerialNum, dp_desc = DpDesc}} = Message, MessageCache, Socket, _Subscribers) ->
serial_num = SerialNum, dp_desc = DpDesc}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received ofp_desc_reply from ~p: Flags = ~p, Mfr_desc = ~p, Hw_desc = ~p,
Sw_desc = ~p, Serial_num = ~p, Dp_desc = ~p~n",
[Socket, Flags, MfrDesc, HwDesc, SwDesc, SerialNum, DpDesc]),
MessageCache#cache{desc_reply = Message};
process_message(#ofp_message{body = #ofp_flow_stats_reply{flags = Flags, body = Stats}} = Message,
MessageCache, Socket, _Subscribers) ->
MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received flow_stats_reply from ~p: Flags = ~p Stats = ~p~n", [Socket, Flags, Stats]),
MessageCache#cache{flow_stats_reply = Message};
process_message(#ofp_message{body = #ofp_aggregate_stats_reply{flags = Flags, packet_count = PacketCount,
byte_count = ByteCount, flow_count = FlowCount}} = Message, MessageCache, Socket, _Subscribers) ->
byte_count = ByteCount, flow_count = FlowCount}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received aggregate_stats_reply from ~p: Flags = ~p, PacketCount = ~p, ByteCount = ~p,
FlowCount = ~p~n", [Socket, Flags, PacketCount, ByteCount, FlowCount]),
MessageCache#cache{aggregate_stats_reply = Message};
process_message(#ofp_message{body = #ofp_table_stats_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers) ->
process_message(#ofp_message{body = #ofp_table_stats_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received table_stats_reply from ~p: Flags = ~p, Stats = ~p~n", [Socket, Flags, Stats]),
MessageCache#cache{table_stats_reply = Message};
process_message(#ofp_message{body = #ofp_port_stats_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers) ->
process_message(#ofp_message{body = #ofp_port_stats_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received ports_stats_reply from ~p: Flags = ~p, Stats = ~p~n", [Socket, Flags, Stats]),
MessageCache#cache{port_stats_reply = Message};
process_message(#ofp_message{body = #ofp_queue_stats_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers) ->
process_message(#ofp_message{body = #ofp_queue_stats_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received queue_stats_reply from ~p: Flags = ~p, Stats = ~p~n", [Socket, Flags, Stats]),
MessageCache#cache{queue_stats_reply = Message};
process_message(#ofp_message{body = #ofp_group_stats_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers) ->
process_message(#ofp_message{body = #ofp_group_stats_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received group_stats_reply from ~p: Flags = ~p, Stats = ~p~n", [Socket, Flags, Stats]),
MessageCache#cache{group_stats_reply = Message};
process_message(#ofp_message{body = #ofp_group_desc_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers) ->
process_message(#ofp_message{body = #ofp_group_desc_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received group_desc_reply from ~p: Flags = ~p, Stats = ~p~n", [Socket, Flags, Stats]),
MessageCache#cache{group_desc_reply = Message};
process_message(#ofp_message{body = #ofp_group_features_reply{flags = Flags, types = Types,
capabilities = Capabilities, max_groups = MaxGroups, actions =Actions }} = Message, MessageCache, Socket, _Subscribers) ->
capabilities = Capabilities, max_groups = MaxGroups, actions =Actions }} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received group_features_reply from ~p: Flags = ~p, Types = ~p, Capabilities = ~p,
MaxGroups=~p, Actions=~p~n", [Socket, Flags, Types, Capabilities, MaxGroups, Actions]),
MessageCache#cache{group_features_reply = Message};
process_message(#ofp_message{body = #ofp_meter_features_reply{flags = Flags, max_meter = MaxMeter,
band_types = BandTypes, capabilities = Capabilities, max_bands = MaxBands, max_color = MaxColor }} = Message, MessageCache, Socket, _Subscribers) ->
band_types = BandTypes, capabilities = Capabilities, max_bands = MaxBands, max_color = MaxColor }} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received meter_features_reply from ~p: Flags = ~p, MaxMeter = ~p,
BandTypes = ~p, Capabilities = ~p, MaxBands = ~p, MaxColor= ~p~n", [Socket, Flags, MaxMeter,
BandTypes, Capabilities, MaxBands, MaxColor]),
MessageCache#cache{meter_features_reply = Message};
process_message(#ofp_message{body = #ofp_meter_config_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers) ->
process_message(#ofp_message{body = #ofp_meter_config_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received meter_config_reply from ~p: Flags = ~p, Stats = ~p~n", [Socket, Flags, Stats]),
MessageCache#cache{meter_config_reply = Message};
process_message(#ofp_message{body = #ofp_table_features_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers) ->
process_message(#ofp_message{body = #ofp_table_features_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received table_features_reply from ~p: Flags = ~p, Stats = ~p~n", [Socket, Flags, Stats]),
MessageCache#cache{table_features_reply = Message};
process_message(#ofp_message{body = #ofp_port_desc_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers) ->
process_message(#ofp_message{body = #ofp_port_desc_reply{flags = Flags, body = Stats}} = Message, MessageCache, Socket, _Subscribers, _Raw) ->
lager:info("Received port_desc_reply from ~p: Flags = ~p, Stats = ~p~n", [Socket, Flags, Stats]),
MessageCache#cache{port_desc_reply = Message};
process_message(#ofp_message{body = #ofp_get_async_reply
{ packet_in_mask = {[MPktInReason], [SPktInReason]},
port_status_mask = {[MPortStatReason], [SPortStatReason]},
flow_removed_mask = {[MFlowRemovedReason], [SFlowRemovedReason]}}}= Message, MessageCache, Socket,_Subscribers) ->
flow_removed_mask = {[MFlowRemovedReason], [SFlowRemovedReason]}}}= Message, MessageCache, Socket,_Subscribers, _Raw) ->
lager:info("Received get_async_reply from ~p:[{~p, ~p}, {~p, ~p} , {~p, ~p}]~n",
[Socket, MPktInReason, SPktInReason, MPortStatReason, SPortStatReason, MFlowRemovedReason, SFlowRemovedReason]),
MessageCache#cache{get_async_reply = Message};
process_message(#ofp_message{body = #ofp_packet_in
{table_id = TableId, match = Match, reason = Reason, data = Data}}, MessageCache, _Socket,Subscribers) ->
{table_id = TableId, match = Match, reason = Reason, data = Data}}, MessageCache, _Socket,Subscribers, _Raw) ->
% lager:info("Received packet_in from ~p: TableId = ~p, Match = ~p, Reason = ~p Cookie =~p~n",
% [Socket, TableId, Match, Reason, Cookie]),
process_packetin(Reason, TableId, Match, Data, MessageCache, Subscribers);
process_message(Message, MessageCache, Socket,_Subscribers) ->
lager:info("Received Message from ~p: ~p ~n", [Socket, Message]),
process_message(_Message, MessageCache, Socket,_Subscribers, Raw) ->
lager:info("Unknown message* ..Received Raw Data from ~p: ~p ~n", [Socket, Raw]),
MessageCache.

%packetin on action
Expand Down
28 changes: 23 additions & 5 deletions src/my_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
-compile([export_all]).
-export([start/0,start/1,dp_link/3,dp_forward/3,match_forward/4,drop_loops/2,drop_loops1/2,
dp_link_and_tap/4,dp_link_and_tap2/4, dp_clear/1, send_of_requests/1,
test_tap/0, test_tap2/0, tap_all/0]).
test_tap/0, test_tap2/0, tap_all/0, test_link12/0, test_link/3, test_clear/0, test_echo/1]).

start()->
loom_controller:start().
Expand Down Expand Up @@ -171,7 +171,7 @@ test_tap() ->
% copies udp traffic from Port 1 to Port2 and Port 3
% where srcIP is 10.0.2.60, 10.48.2.5
test_tap2() ->
D = list_to_pid("<0.91.0>"),
[D|_] = loom_ofdp:get_all(default),
loom_ofdp_lib:clear(D),
IPv4Src1 = <<10:8,0:8,2:8,60:8>>,
IPv4Src2 = <<10:8,48:8,2:8,5:8>>,
Expand All @@ -189,6 +189,24 @@ tap_port2(Port1, Port2, Port3, IPv4Src, Pid) ->
loom_ofdp:send_ofp_msg(Pid, M1).

tap_all() ->
D = list_to_pid("<0.91.0>"),
loom_ofdp_lib:forward(D,2,[1, 3]),
loom_ofdp_lib:forward(D, 1,[2, 3]).
[D|_] = loom_ofdp:get_all(default),
loom_ofdp_lib:forward(D,2,[1, 3]),
loom_ofdp_lib:forward(D, 1,[2, 3]).

% copies all traffic from Port 1 to Port 2 and vice versa
test_link12() ->
test_link(1, 2, []).
test_link(Port1, Port2, Ports)->
[D|_] = loom_ofdp:get_all(default),
dp_forward(D, Port1, [Port2|Ports]),
dp_forward(D, Port2, [Port1|Ports]).

test_clear() ->
[D|_] = loom_ofdp:get_all(default),
dp_clear(D).

% usage my_controller:test_echo(<<"test">>).
test_echo(Data)->
[D|_] = loom_ofdp:get_all(default),
M = loom_flow_lib:echo_request(Data),
loom_ofdp:send_ofp_msg(D, M).