Skip to content

Commit

Permalink
"flowtuple" -> "flowtuple3"
Browse files Browse the repository at this point in the history
We're moving to a new flowtuple4 format, so it is useful for
the existing flowtuple reading code to be explicit in its naming
scheme so that we can have eventually support flowtuple3 and
flowtuple4 side-by-side.
  • Loading branch information
salcock committed Mar 26, 2021
1 parent 59a9581 commit d9188a4
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 136 deletions.
8 changes: 4 additions & 4 deletions examples/flowtuple-example.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Example code that uses the AvroFlowtupleReader extension class to
# Example code that uses the AvroFlowtuple3Reader extension class to
# count flowtuples via a perFlowtuple callback method

import sys
from collections import defaultdict
from pyavro_stardust.flowtuple import AvroFlowtupleReader, \
FlowtupleAttributeNum, FlowtupleAttributeStr
from pyavro_stardust.flowtuple3 import AvroFlowtuple3Reader, \
Flowtuple3AttributeNum, Flowtuple3AttributeStr

counter = 0
protocols = defaultdict(int)
Expand All @@ -26,7 +26,7 @@ def run():

# sys.argv[1] must be a valid wandio path -- e.g. a swift URL or
# a path to a file on disk
ftreader = AvroFlowtupleReader(sys.argv[1])
ftreader = AvroFlowtuple3Reader(sys.argv[1])
ftreader.start()

# This will read all flowtuples and call `perFlowtupleCallback` on
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def no_cythonize(extensions, **_ignore):

extensions = [
Extension('pyavro_stardust.baseavro', ['src/pyavro_stardust/baseavro.pyx']),
Extension('pyavro_stardust.flowtuple', ['src/pyavro_stardust/flowtuple.pyx'])
Extension('pyavro_stardust.flowtuple3', ['src/pyavro_stardust/flowtuple3.pyx']),
#Extension('pyavro_stardust.rsdos', ['src/pyavro_stardust/rsdos.pyx'])
]

CYTHONIZE = bool(int(os.getenv("CYTHONIZE", 0))) and cythonize is not None
Expand Down
36 changes: 0 additions & 36 deletions src/pyavro_stardust/flowtuple.pxd

This file was deleted.

95 changes: 0 additions & 95 deletions src/pyavro_stardust/flowtuple.pyx

This file was deleted.

36 changes: 36 additions & 0 deletions src/pyavro_stardust/flowtuple3.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import cython
from pyavro_stardust.baseavro cimport AvroRecord, AvroReader

cpdef enum Flowtuple3AttributeNum:
ATTR_FT3_TIMESTAMP = 0
ATTR_FT3_SRC_IP = 1
ATTR_FT3_DST_IP = 2
ATTR_FT3_SRC_PORT = 3
ATTR_FT3_DST_PORT = 4
ATTR_FT3_PROTOCOL = 5
ATTR_FT3_TTL = 6
ATTR_FT3_TCP_FLAGS = 7
ATTR_FT3_IP_LEN = 8
ATTR_FT3_SYN_LEN = 9
ATTR_FT3_SYNWIN_LEN = 10
ATTR_FT3_PKT_COUNT = 11
ATTR_FT3_ISSPOOFED = 12
ATTR_FT3_ISMASSCAN = 13
ATTR_FT3_ASN = 14

cpdef enum Flowtuple3AttributeStr:
ATTR_FT3_MAXMIND_CONTINENT = 0
ATTR_FT3_MAXMIND_COUNTRY = 1
ATTR_FT3_NETACQ_CONTINENT = 2
ATTR_FT3_NETACQ_COUNTRY = 3


cdef class AvroFlowtuple3(AvroRecord):
cpdef dict asDict(self)

cdef class AvroFlowtuple3Reader(AvroReader):
cdef int _parseNextRecord(self, const unsigned char[:] buf,
const int maxlen)


# vim: set sw=4 tabstop=4 softtabstop=4 expandtab :
95 changes: 95 additions & 0 deletions src/pyavro_stardust/flowtuple3.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# cython: language_level=3
cimport cython
from pyavro_stardust.baseavro cimport AvroRecord, read_long, read_string, \
AvroReader

@cython.final
cdef class AvroFlowtuple3(AvroRecord):

def __init__(self):
super().__init__(ATTR_FT3_ASN + 1, ATTR_FT3_NETACQ_COUNTRY + 1)

def __str__(self):
return "%u %08x %08x %u %u %u %u %u %u %s %s %u" % \
(self.attributes_l[<int>ATTR_FT3_TIMESTAMP], \
self.attributes_l[<int>ATTR_FT3_SRC_IP], \
self.attributes_l[<int>ATTR_FT3_DST_IP], \
self.attributes_l[<int>ATTR_FT3_SRC_PORT], \
self.attributes_l[<int>ATTR_FT3_DST_PORT], \
self.attributes_l[<int>ATTR_FT3_PROTOCOL], \
self.attributes_l[<int>ATTR_FT3_TTL], \
self.attributes_l[<int>ATTR_FT3_TCP_FLAGS], \
self.attributes_l[<int>ATTR_FT3_IP_LEN], \
self.attributes_s[<int>ATTR_FT3_NETACQ_CONTINENT].decode('utf-8'), \
self.attributes_s[<int>ATTR_FT3_NETACQ_COUNTRY].decode('utf-8'), \
self.attributes_l[<int>ATTR_FT3_ASN])

cpdef dict asDict(self):
return {
"timestamp": self.attributes_l[<int>ATTR_FT3_TIMESTAMP],
"src_ip": self.attributes_l[<int>ATTR_FT3_SRC_IP],
"dst_ip": self.attributes_l[<int>ATTR_FT3_DST_IP],
"src_port": self.attributes_l[<int>ATTR_FT3_SRC_PORT],
"dst_port": self.attributes_l[<int>ATTR_FT3_DST_PORT],
"protocol": self.attributes_l[<int>ATTR_FT3_PROTOCOL],
"ttl": self.attributes_l[<int>ATTR_FT3_TTL],
"tcpflags": self.attributes_l[<int>ATTR_FT3_TCP_FLAGS],
"ip_len": self.attributes_l[<int>ATTR_FT3_IP_LEN],
"packets": self.attributes_l[<int>ATTR_FT3_PKT_COUNT],
"tcp_synlen": self.attributes_l[<int>ATTR_FT3_SYN_LEN],
"tcp_synwinlen": self.attributes_l[<int>ATTR_FT3_SYNWIN_LEN],
"is_spoofed": self.attributes_l[<int>ATTR_FT3_ISSPOOFED],
"is_masscan": self.attributes_l[<int>ATTR_FT3_ISMASSCAN],
"asn": self.attributes_l[<int>ATTR_FT3_ASN],
"netacq_continent": self.attributes_s[<int>ATTR_FT3_NETACQ_CONTINENT],
"netacq_country": self.attributes_s[<int>ATTR_FT3_NETACQ_COUNTRY],
"maxmind_country": self.attributes_s[<int>ATTR_FT3_MAXMIND_COUNTRY],
"maxmind_continent": self.attributes_s[<int>ATTR_FT3_MAXMIND_CONTINENT]
}

@cython.final
cdef class AvroFlowtuple3Reader(AvroReader):

def __init__(self, filepath):
super().__init__(filepath)
self.currentrec = AvroFlowtuple3()

cdef int _parseNextRecord(self, const unsigned char[:] buf,
const int maxlen):

cdef int offset, offinc
cdef Flowtuple3AttributeNum i
cdef Flowtuple3AttributeStr j

if maxlen == 0:
return 0
offset = 0

self.currentrec.resetRecord()

# Process each field in turn -- order is critical, must match
# field order in avro record!
for i in range(0, ATTR_FT3_ISMASSCAN + 1):
offinc = self.currentrec.parseNumeric(buf[offset:],
maxlen - offset, i)
if offinc <= 0:
return 0
offset += offinc

for j in range(0, ATTR_FT3_NETACQ_COUNTRY + 1):
offinc = self.currentrec.parseString(buf[offset:],
maxlen - offset, j)
if offinc <= 0:
return 0
offset += offinc

offinc = self.currentrec.parseNumeric(buf[offset:], maxlen - offset,
ATTR_FT3_ASN)
if offinc <= 0:
return 0
offset += offinc

return 1


# vim: set sw=4 tabstop=4 softtabstop=4 expandtab :

0 comments on commit d9188a4

Please sign in to comment.