-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsflow-filter
More file actions
189 lines (156 loc) · 6.39 KB
/
sflow-filter
File metadata and controls
189 lines (156 loc) · 6.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# Sflow Filter for LogStash
input {
pipe {
type => "sflow"
command => "/usr/local/bin/sflowtool_wrapper.sh -l -p 6343"
}
}
filter {
if [type] == "sflow" {
# sFlow sends two kinds of messages - CNTRs and FLOWs
# I'm not doing anything with CNTRs at this point, so
# I drop those, and we concentrate on processing FLOW
# messages.
if [message] =~ /CNTR/ {
drop { }
}
# sFlow FLOW messages break down into the following fields.
# I have named them arbitrarily, but they correspond to the
# actual field names. I developed this grok pattern using
# two tools:
#
# - sflowtool (http://www.inmon.com/technology/sflowTools.php)
# Written by InMon, it provides a way to examine sFlow messages
# in human readable format.
#
# - Grok Debugger (https://grokdebug.herokuapp.com/)
# Invaluable, and self-explanatory.
grok {
match => { "message" => "%{WORD:SampleType},%{IP:sflow_ReporterIP},%{WORD:sflow_inputPort},%{WORD:sflow_outputPort},%{WORD:sflow_srcMAC},%{WORD:sflow_dstMAC},%{WORD:sflow_EtherType},%{NUMBER:sflow_in_vlan},%{NUMBER:sflow_out_vlan},%{IP:sflow_srcIP},%{IP:sflow_dstIP},%{NUMBER:sflow_IPProtocol},%{WORD:sflow_IPTOS},%{WORD:sflow_IPTTL},%{NUMBER:sflow_srcPort},%{NUMBER:sflow_dstPort},%{DATA:sflow_tcpFlags},%{NUMBER:sflow_PacketSize},%{NUMBER:sflow_IPSize},%{NUMBER:sflow_SampleRate}" }
}
# Sometimes it doesn't work out.
if "_grokparsefailure" in [tags] {
drop { }
}
# Because I'll ultimately be displaying all of this
# in Kibana, I want to translate many of the IP addresses
# into recognizable hostnames. We take the IP fields,
# and copy them into new fields that we'll be doing DNS
# lookups on:
mutate {
add_field => {
"[sflow_SrcHostname]" => "%{sflow_srcIP}"
"[sflow_DstHostname]" => "%{sflow_dstIP}"
"[sflow_ReporterName]" => "%{sflow_ReporterIP}"
}
}
# I also want to translate things like Source and
# Destination port numbers into known service names.
# for this to work, you have to built some YAML files,
# which basically map everything you'd find in an
# /etc/services file. I'll post my YAML files in
# other Gists.
translate {
field => "[sflow_srcPort]"
destination => "[sflow_SrcSvcName]"
dictionary_path => "/etc/logstash/dictionaries/iana_services.yaml"
}
translate {
field => "[sflow_dstPort]"
destination => "[sflow_DstSvcName]"
dictionary_path => "/etc/logstash/dictionaries/iana_services.yaml"
}
translate {
field => "[sflow_IPProtocol]"
destination => "[sflow_ProtName]"
dictionary_path => "/etc/logstash/dictionaries/iana_protocols.yaml"
}
translate {
field => "[sflow_tcpFlags]"
destination => "[sflow_TCPFlagDecode]"
dictionary_path => "/etc/logstash/dictionaries/tcp_flags.yaml"
}
# Here we do our DNS reverse lookups:
dns {
reverse => [ "sflow_SrcHostname" ]
action => "replace"
}
dns {
reverse => [ "sflow_DstHostname" ]
action => "replace"
}
dns {
reverse => [ "sflow_ReporterName" ]
action => "replace"
}
# If the reverse lookup didn't fail (leaving us with
# nothing but numerics) we grok the FQDN, parsing it
# into a HOST and DOMAIN - and then we store the
# HOST portion into a new field - sflow_SrcHost
#
# Otherwise, we just dump the unresolved IP address
# into the sflow_SrcHostname field.
if [sflow_SrcHostname] !~ /(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ {
grok {
match => [ "[sflow_SrcHostname]", "%{DATA:src_host}\.%{GREEDYDATA:domain}" ]
add_field => { "[sflow_SrcHost]" => "%{src_host}" }
}
} else {
mutate {
add_field => [ "[sflow_SrcHost]", "%{[sflow_SrcHostname}" ]
}
}
if [sflow_DstHostname] !~ /(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ {
grok {
match => [ "[sflow_DstHostname]", "%{DATA:dst_host}\.%{GREEDYDATA:domain}" ]
add_field => { "[sflow_DstHost]" => "%{dst_host}" }
}
} else {
mutate {
add_field => [ "[sflow_DstHost]", "%{[sflow_DstHostname}" ]
}
}
if [sflow_ReporterName] !~ /(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ {
grok {
match => [ "[sflow_ReporterName]", "%{DATA:rep_host}(\.|\-)" ]
add_field => { "[sflow_Reporter" => "%{rep_host}" }
}
} else {
mutate {
add_field => [ "[sflow_Reporter]", "%{[sflow_ReporterName}" ]
}
}
# There are a bunch of fields from the original
# message, as well as some I created temporarily,
# that I don't care to store. Here's where I drop
# them all.
# mutate {
# remove_field => [ "host", "command", "sflow_inputPort", "sflow_outputPort", "sflow_srcMAC", "sflow_dstMAC", "sflow_EtherType", "sflow_in_vlan", "sflow_out_vlan", "sflow_IPTTL", "sflow_IPSize", "message", "src_host", "dst_host", "rep_host", "SampleType", "domain" ]
# }
mutate {
remove_field => [ "host", "command", "message", "src_host", "dst_host", "rep_host", "SampleType", "domain" ]
}
# Lastly, we're going to want Kibana and
# Elasticsearch to be able to do some math
# with certain fields, so make sure they're
# handled as numbers, rather than strings.
# You can also do this by setting up templates
# in Elasticsearch, but this is easier.
mutate {
convert => {
"sflow_PacketSize" => "integer"
"sflow_SampleRate" => "integer"
}
}
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
# protocol => [ "http" ]
index => [ "xx-logstash-%{+YYYY.MM.dd}" ]
}
stdout {
codec => json
}
}