Skip to content

Commit

Permalink
Update flowtuple4 to support schema version 2 (with spoofed+masscan)
Browse files Browse the repository at this point in the history
Schema version is now declared in the baseavro class, as it
is used by multiple data formats.
  • Loading branch information
salcock committed Jul 26, 2021
1 parent c99534c commit d11d778
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/pyavro_stardust/baseavro.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf,

cdef class AvroRecord:

cdef public unsigned int schemaversion
cdef long *attributes_l
cdef char **attributes_s
cdef long **attributes_na
Expand All @@ -78,9 +79,11 @@ cdef class AvroRecord:
const unsigned int maxlen, const int attrind)
cpdef vector[long] getNumericArray(self, const int attrind)
cpdef void resetRecord(self)
cpdef void setSchemaVersion(self, const unsigned int schemaversion)


cdef class AvroReader:
cdef unsigned int schemaversion
cdef unsigned int nextblock
cdef unsigned int unzip_offset
cdef fh
Expand Down
5 changes: 5 additions & 0 deletions src/pyavro_stardust/baseavro.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ cdef class AvroRecord:
self.stringcount = 0
self.numcount = 0
self.numarraycount = 0
self.schemaversion = 0

def __init__(self, numeric, strings, numarrays):
cdef unsigned int i
Expand Down Expand Up @@ -297,6 +298,9 @@ cdef class AvroRecord:
self.attributes_na[i] = NULL


cpdef void setSchemaVersion(self, const unsigned int schemaversion):
self.schemaversion = schemaversion

cdef class AvroReader:
def __init__(self, filepath):
self.filepath = filepath
Expand All @@ -308,6 +312,7 @@ cdef class AvroReader:
self.unzip_offset = 0
self.currentrec = None
self.schemajson = None
self.schemaversion = 0

def _readMore(self):
try:
Expand Down
3 changes: 3 additions & 0 deletions src/pyavro_stardust/flowtuple4.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ cpdef enum Flowtuple4AttributeNum:
ATTR_FT4_FIRST_SYN_LEN = 11
ATTR_FT4_FIRST_TCP_RWIN = 12
ATTR_FT4_ASN = 13
ATTR_FT4_SPOOFED_COUNT = 14
ATTR_FT4_MASSCAN_COUNT = 15
ATTR_FT4_LAST_NUMERIC = 16

cpdef enum Flowtuple4AttributeStr:
ATTR_FT4_MAXMIND_CONTINENT = 0
Expand Down
34 changes: 29 additions & 5 deletions src/pyavro_stardust/flowtuple4.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ from cpython cimport array
@cython.final
cdef class AvroFlowtuple4(AvroRecord):
def __init__(self):
super().__init__(ATTR_FT4_ASN + 1, ATTR_FT4_LAST_STRING,
super().__init__(ATTR_FT4_LAST_NUMERIC, ATTR_FT4_LAST_STRING,
ATTR_FT4_LAST_NUM_ARRAY)

def __str__(self):
Expand Down Expand Up @@ -87,6 +87,10 @@ cdef class AvroFlowtuple4(AvroRecord):
"maxmind_country": self.attributes_s[<int>ATTR_FT4_MAXMIND_COUNTRY]
}

if self.schemaversion == 2:
asdict["spoofed"] = self.attributes_l[<int>ATTR_FT4_SPOOFED_COUNT]
asdict["masscan"] = self.attributes_l[<int>ATTR_FT4_MASSCAN_COUNT]

if needarrays:
# XXX this feels like it could be faster, but not sure how
# to improve this
Expand All @@ -106,6 +110,7 @@ cdef class AvroFlowtuple4(AvroRecord):
comm_ports.push_back(cv)
asdict['common_src_ports'] = comm_ports


pkt_sizes = self.getNumericArray(<int>ATTR_FT4_COMMON_PKT_SIZES)
size_freqs = self.getNumericArray(<int>ATTR_FT4_COMMON_PKT_SIZE_FREQS)
for i in range(pkt_sizes.size()):
Expand All @@ -114,6 +119,7 @@ cdef class AvroFlowtuple4(AvroRecord):
comm_sizes.push_back(cv)
asdict['common_pkt_sizes'] = comm_sizes


flags = self.getNumericArray(<int>ATTR_FT4_COMMON_TCP_FLAGS)
flag_freqs = self.getNumericArray(<int>ATTR_FT4_COMMON_TCP_FLAG_FREQS)
for i in range(flags.size()):
Expand All @@ -122,6 +128,7 @@ cdef class AvroFlowtuple4(AvroRecord):
comm_flags.push_back(cv)
asdict['common_tcp_flags'] = comm_flags


return asdict

@cython.final
Expand All @@ -143,6 +150,14 @@ cdef class AvroFlowtuple4Reader(AvroReader):

self.currentrec.resetRecord()

if self.schemaversion == 0:
self.schemaversion = 1
for f in self.schemajson['fields']:
if "masscan_packet_cnt" == f['name']:
self.schemaversion = 2
break
self.currentrec.setSchemaVersion(self.schemaversion)

for i in range(0, ATTR_FT4_FIRST_TCP_RWIN + 1):
offinc = self.currentrec.parseNumeric(buf[offset:],
maxlen - offset, i)
Expand All @@ -164,11 +179,20 @@ cdef class AvroFlowtuple4Reader(AvroReader):
return 0
offset += offinc

offinc = self.currentrec.parseNumeric(buf[offset:], maxlen - offset,
ATTR_FT4_ASN)
if offinc <= 0:
if self.schemaversion == 1:
maxattr = ATTR_FT4_ASN + 1
elif self.schemaversion == 2:
maxattr = ATTR_FT4_LAST_NUMERIC
else:
return 0
offset += offinc

for i in range(ATTR_FT4_ASN, maxattr):
offinc = self.currentrec.parseNumeric(buf[offset:],
maxlen - offset, i)
if offinc <= 0:
return 0
offset += offinc

return 1


Expand Down
3 changes: 0 additions & 3 deletions src/pyavro_stardust/rsdos.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,15 @@ cdef class AvroRsdos(AvroRecord):

cdef unsigned char *packetcontent
cdef public unsigned int pktcontentlen
cdef public unsigned int schemaversion

cpdef dict asDict(self)
cpdef void setSchemaVersion(self, const unsigned int schemaversion)
cpdef void resetRecord(self)
cpdef bytes getRsdosPacketString(self)
cpdef unsigned int getRsdosPacketSize(self)
cpdef int setRsdosPacketString(self, const unsigned char[:] buf,
const unsigned int maxlen)

cdef class AvroRsdosReader(AvroReader):
cdef unsigned int schemaversion
cdef int _parseNextRecord(self, const unsigned char[:] buf,
const unsigned int maxlen)

Expand Down
4 changes: 0 additions & 4 deletions src/pyavro_stardust/rsdos.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,12 @@ cdef class AvroRsdos(AvroRecord):
self.sizeinbuf += astr.toskip + astr.strlen
return 1

cpdef void setSchemaVersion(self, const unsigned int schemaversion):
self.schemaversion = schemaversion

@cython.final
cdef class AvroRsdosReader(AvroReader):

def __init__(self, filepath):
super().__init__(filepath)
self.currentrec = AvroRsdos()
self.schemaversion = 0

cdef int _parseNextRecord(self, const unsigned char[:] buf,
const unsigned int maxlen):
Expand Down

0 comments on commit d11d778

Please sign in to comment.