Skip to content

Commit

Permalink
Merge pull request #6 from CAIDA/rsdos-multiversion
Browse files Browse the repository at this point in the history
Add support for upcoming new RSDOS data schema
  • Loading branch information
salcock authored May 2, 2021
2 parents b166c09 + 472e826 commit 941f800
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/pyavro_stardust/baseavro.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ cdef class AvroReader:
cdef bytearray bufrin
cdef bytes unzipped
cdef AvroRecord currentrec
cdef dict schemajson

cpdef void _readAvroFileHeader(self)
cdef int _parseNextRecord(self, const unsigned char[:] buf,
Expand Down
33 changes: 24 additions & 9 deletions src/pyavro_stardust/baseavro.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from libc.string cimport memcpy
from libcpp.vector cimport vector
from cpython.mem cimport PyMem_Malloc, PyMem_Free, PyMem_Realloc
import zlib, wandio, sys
import zlib, wandio, sys, json
cimport cython

cdef (unsigned int, long) read_long(const unsigned char[:] buf,
Expand Down Expand Up @@ -307,6 +307,7 @@ cdef class AvroReader:
self.unzipped = None
self.unzip_offset = 0
self.currentrec = None
self.schemajson = None

def _readMore(self):
try:
Expand All @@ -326,6 +327,7 @@ cdef class AvroReader:
cdef unsigned int offset, fullsize, offinc
cdef int i
cdef long array_size, keylen, vallen
cdef parsedString mapkey, schemabytes

if len(self.bufrin) < 32:
if self._readMore() < 0:
Expand All @@ -342,15 +344,28 @@ cdef class AvroReader:
offset += offinc

for i in range(0, array_size):
offinc, keylen = read_long(self.bufrin[offset:], fullsize - offset)
if keylen is None:
mapkey = read_string(self.bufrin[offset:], fullsize - offset)
if mapkey.toskip == 0:
return
offset += (offinc + keylen)

offinc, vallen = read_long(self.bufrin[offset:], fullsize - offset)
if vallen is None:
return
offset += (offinc + vallen)
offset += (mapkey.toskip + mapkey.strlen)

if mapkey.start.decode('ascii') == 'avro.schema':
schemabytes = read_string(self.bufrin[offset:],
fullsize-offset)
if schemabytes.toskip == 0:
PyMem_Free(mapkey.start)
return

self.schemajson = json.loads(schemabytes.start)
PyMem_Free(schemabytes.start)
offset += (schemabytes.toskip + schemabytes.strlen)
else:
offinc, vallen = read_long(self.bufrin[offset:], fullsize - offset)
if vallen is None:
PyMem_Free(mapkey.start)
return
offset += (offinc + vallen)
PyMem_Free(mapkey.start)

# skip past trailing zero size array
assert(self.bufrin[offset] == 0)
Expand Down
8 changes: 7 additions & 1 deletion src/pyavro_stardust/rsdos.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,26 @@ cpdef enum RsdosAttribute:
ATTR_RSDOS_START_TIME_USEC = 12
ATTR_RSDOS_LATEST_TIME_SEC = 13
ATTR_RSDOS_LATEST_TIME_USEC = 14
ATTR_RSDOS_LAST_ATTRIBUTE = 15
ATTR_RSDOS_FIRST_ATTACK_PORT = 15
ATTR_RSDOS_FIRST_TARGET_PORT = 16
ATTR_RSDOS_LAST_ATTRIBUTE = 17

cdef class AvroRsdos(AvroRecord):

cdef unsigned char *packetcontent
cdef public unsigned int pktcontentlen
cdef public unsigned int schemaversion

cpdef dict asDict(self)
cpdef void setSchemaVersion(self, const unsigned int schemaversion)
cpdef void resetRecord(self)
cpdef bytes getRsdosPacketString(self)
cpdef unsigned int getRsdosPacketSize(self)
cpdef int setRsdosPacketString(self, const unsigned char[:] buf,
const unsigned int maxlen)

cdef class AvroRsdosReader(AvroReader):
cdef unsigned int schemaversion
cdef int _parseNextRecord(self, const unsigned char[:] buf,
const unsigned int maxlen)

Expand Down
69 changes: 63 additions & 6 deletions src/pyavro_stardust/rsdos.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ cdef class AvroRsdos(AvroRecord):
super().__init__(ATTR_RSDOS_LAST_ATTRIBUTE, 0, 0)
self.pktcontentlen = 0
self.packetcontent = NULL
self.schemaversion = 1

def __str__(self):
return "%u %u.%06u %u.%06u %08x %u %u %u %u %u %u %u %u %u" % \
Expand All @@ -62,15 +63,17 @@ cdef class AvroRsdos(AvroRecord):
self.attributes_l[<int>ATTR_RSDOS_PACKET_CNT],
self.attributes_l[<int>ATTR_RSDOS_BYTE_CNT],
self.attributes_l[<int>ATTR_RSDOS_MAX_PPM_INTERVAL],
self.pktcontentlen)
self.getRsdosPacketSize())

cpdef dict asDict(self):
if self.pktcontentlen == 0:
cdef dict result

if self.getRsdosPacketSize() == 0:
initpkt = None
else:
initpkt = self.getRsdosPacketString()

return {
result = {
"timestamp": self.attributes_l[<int>ATTR_RSDOS_TIMESTAMP],
"start_time_sec": self.attributes_l[<int>ATTR_RSDOS_START_TIME_SEC],
"start_time_usec": self.attributes_l[<int>ATTR_RSDOS_START_TIME_USEC],
Expand All @@ -89,6 +92,12 @@ cdef class AvroRsdos(AvroRecord):
"initial_packet": initpkt,
}

if self.schemaversion == 2:
result['first_attack_port'] = self.attributes_l[<int>ATTR_RSDOS_FIRST_ATTACK_PORT]
result['first_target_port'] = self.attributes_l[<int>ATTR_RSDOS_FIRST_TARGET_PORT]

return result


cpdef void resetRecord(self):
self.pktcontentlen = 0
Expand All @@ -97,13 +106,41 @@ cdef class AvroRsdos(AvroRecord):
super(AvroRsdos, self).resetRecord()

cpdef bytes getRsdosPacketString(self):
return <bytes>self.packetcontent[:self.pktcontentlen]
cdef unsigned int tagheadersize

tagheadersize = 0

# Ideally, we would be able to pull the size out of libtrace or
# libcorsaro, but for now we'll just have to hard-code the size here
if self.schemaversion == 1:
tagheadersize = 35 + (4 * 8)

if self.pktcontentlen <= tagheadersize:
return None

return <bytes>self.packetcontent[tagheadersize:self.pktcontentlen]

cpdef unsigned int getRsdosPacketSize(self):
cdef unsigned int tagheadersize

tagheadersize = 0

# Ideally, we would be able to pull the size out of libtrace or
# libcorsaro, but for now we'll just have to hard-code the size here
if self.schemaversion == 1:
tagheadersize = 35 + (4 * 8)

if self.pktcontentlen <= tagheadersize:
return 0

return self.pktcontentlen - tagheadersize

cpdef int setRsdosPacketString(self, const unsigned char[:] buf,
const unsigned int maxlen):

cdef parsedString astr


astr = read_string(buf, maxlen, addNullTerm=False)
if astr.toskip == 0:
return 0
Expand All @@ -112,26 +149,46 @@ cdef class AvroRsdos(AvroRecord):
self.sizeinbuf += astr.toskip + astr.strlen
return 1

cpdef void setSchemaVersion(self, const unsigned int schemaversion):
self.schemaversion = schemaversion

@cython.final
cdef class AvroRsdosReader(AvroReader):

def __init__(self, filepath):
super().__init__(filepath)
self.currentrec = AvroRsdos()
self.schemaversion = 0

cdef int _parseNextRecord(self, const unsigned char[:] buf,
const unsigned int maxlen):

cdef unsigned int offset, offinc
cdef RsdosAttribute i
cdef unsigned int maxattr

if maxlen == 0:
return 0
offset = 0

self.currentrec.resetRecord()

for i in range(0, ATTR_RSDOS_LATEST_TIME_USEC + 1):
if self.schemaversion == 0:
self.schemaversion = 1
for f in self.schemajson['fields']:
if "first_attack_port" == f['name']:
self.schemaversion = 2
break
self.currentrec.setSchemaVersion(self.schemaversion)

if self.schemaversion == 1:
maxattr = ATTR_RSDOS_LATEST_TIME_USEC + 1
elif self.schemaversion == 2:
maxattr = ATTR_RSDOS_LAST_ATTRIBUTE
else:
return 0

self.currentrec.resetRecord()
for i in range(0, maxattr):
offinc = self.currentrec.parseNumeric(buf[offset:],
maxlen - offset, i)
if offinc == 0:
Expand Down

0 comments on commit 941f800

Please sign in to comment.