Skip to content

Commit

Permalink
Add support for upcoming "new" RSDOS schema
Browse files Browse the repository at this point in the history
All AvroReaders will now save the schema from the Avro header
so that they can check for the presence of potentially missing
data fields.
  • Loading branch information
salcock committed Apr 23, 2021
1 parent b166c09 commit f42b805
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/pyavro_stardust/baseavro.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ cdef class AvroReader:
cdef bytearray bufrin
cdef bytes unzipped
cdef AvroRecord currentrec
cdef dict schemajson

cpdef void _readAvroFileHeader(self)
cdef int _parseNextRecord(self, const unsigned char[:] buf,
Expand Down
33 changes: 24 additions & 9 deletions src/pyavro_stardust/baseavro.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from libc.string cimport memcpy
from libcpp.vector cimport vector
from cpython.mem cimport PyMem_Malloc, PyMem_Free, PyMem_Realloc
import zlib, wandio, sys
import zlib, wandio, sys, json
cimport cython

cdef (unsigned int, long) read_long(const unsigned char[:] buf,
Expand Down Expand Up @@ -307,6 +307,7 @@ cdef class AvroReader:
self.unzipped = None
self.unzip_offset = 0
self.currentrec = None
self.schemajson = None

def _readMore(self):
try:
Expand All @@ -326,6 +327,7 @@ cdef class AvroReader:
cdef unsigned int offset, fullsize, offinc
cdef int i
cdef long array_size, keylen, vallen
cdef parsedString mapkey, schemabytes

if len(self.bufrin) < 32:
if self._readMore() < 0:
Expand All @@ -342,15 +344,28 @@ cdef class AvroReader:
offset += offinc

for i in range(0, array_size):
offinc, keylen = read_long(self.bufrin[offset:], fullsize - offset)
if keylen is None:
mapkey = read_string(self.bufrin[offset:], fullsize - offset)
if mapkey.toskip == 0:
return
offset += (offinc + keylen)

offinc, vallen = read_long(self.bufrin[offset:], fullsize - offset)
if vallen is None:
return
offset += (offinc + vallen)
offset += (mapkey.toskip + mapkey.strlen)

if mapkey.start.decode('ascii') == 'avro.schema':
schemabytes = read_string(self.bufrin[offset:],
fullsize-offset)
if schemabytes.toskip == 0:
PyMem_Free(mapkey.start)
return

self.schemajson = json.loads(schemabytes.start)
PyMem_Free(schemabytes.start)
offset += (schemabytes.toskip + schemabytes.strlen)
else:
offinc, vallen = read_long(self.bufrin[offset:], fullsize - offset)
if vallen is None:
PyMem_Free(mapkey.start)
return
offset += (offinc + vallen)
PyMem_Free(mapkey.start)

# skip past trailing zero size array
assert(self.bufrin[offset] == 0)
Expand Down
7 changes: 6 additions & 1 deletion src/pyavro_stardust/rsdos.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,25 @@ cpdef enum RsdosAttribute:
ATTR_RSDOS_START_TIME_USEC = 12
ATTR_RSDOS_LATEST_TIME_SEC = 13
ATTR_RSDOS_LATEST_TIME_USEC = 14
ATTR_RSDOS_LAST_ATTRIBUTE = 15
ATTR_RSDOS_FIRST_ATTACK_PORT = 15
ATTR_RSDOS_FIRST_TARGET_PORT = 16
ATTR_RSDOS_LAST_ATTRIBUTE = 17

cdef class AvroRsdos(AvroRecord):

cdef unsigned char *packetcontent
cdef public unsigned int pktcontentlen
cdef public unsigned int schemaversion

cpdef dict asDict(self)
cpdef void setSchemaVersion(self, const unsigned int schemaversion)
cpdef void resetRecord(self)
cpdef bytes getRsdosPacketString(self)
cpdef int setRsdosPacketString(self, const unsigned char[:] buf,
const unsigned int maxlen)

cdef class AvroRsdosReader(AvroReader):
cdef unsigned int schemaversion
cdef int _parseNextRecord(self, const unsigned char[:] buf,
const unsigned int maxlen)

Expand Down
35 changes: 32 additions & 3 deletions src/pyavro_stardust/rsdos.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ cdef class AvroRsdos(AvroRecord):
super().__init__(ATTR_RSDOS_LAST_ATTRIBUTE, 0, 0)
self.pktcontentlen = 0
self.packetcontent = NULL
self.schemaversion = 1

def __str__(self):
return "%u %u.%06u %u.%06u %08x %u %u %u %u %u %u %u %u %u" % \
Expand All @@ -65,12 +66,14 @@ cdef class AvroRsdos(AvroRecord):
self.pktcontentlen)

cpdef dict asDict(self):
cdef dict result

if self.pktcontentlen == 0:
initpkt = None
else:
initpkt = self.getRsdosPacketString()

return {
result = {
"timestamp": self.attributes_l[<int>ATTR_RSDOS_TIMESTAMP],
"start_time_sec": self.attributes_l[<int>ATTR_RSDOS_START_TIME_SEC],
"start_time_usec": self.attributes_l[<int>ATTR_RSDOS_START_TIME_USEC],
Expand All @@ -89,6 +92,12 @@ cdef class AvroRsdos(AvroRecord):
"initial_packet": initpkt,
}

if self.schemaversion == 2:
result['first_attack_port'] = self.attributes_l[<int>ATTR_RSDOS_FIRST_ATTACK_PORT]
result['first_target_port'] = self.attributes_l[<int>ATTR_RSDOS_FIRST_TARGET_PORT]

return result


cpdef void resetRecord(self):
self.pktcontentlen = 0
Expand All @@ -112,26 +121,46 @@ cdef class AvroRsdos(AvroRecord):
self.sizeinbuf += astr.toskip + astr.strlen
return 1

cpdef void setSchemaVersion(self, const unsigned int schemaversion):
self.schemaversion = schemaversion

@cython.final
cdef class AvroRsdosReader(AvroReader):

def __init__(self, filepath):
super().__init__(filepath)
self.currentrec = AvroRsdos()
self.schemaversion = 0

cdef int _parseNextRecord(self, const unsigned char[:] buf,
const unsigned int maxlen):

cdef unsigned int offset, offinc
cdef RsdosAttribute i
cdef unsigned int maxattr

if maxlen == 0:
return 0
offset = 0

self.currentrec.resetRecord()

for i in range(0, ATTR_RSDOS_LATEST_TIME_USEC + 1):
if self.schemaversion == 0:
self.schemaversion = 1
for f in self.schemajson['fields']:
if "first_attack_port" == f['name']:
self.schemaversion = 2
break
self.currentrec.setSchemaVersion(self.schemaversion)

if self.schemaversion == 1:
maxattr = ATTR_RSDOS_LATEST_TIME_USEC + 1
elif self.schemaversion == 2:
maxattr = ATTR_RSDOS_LAST_ATTRIBUTE
else:
return 0

self.currentrec.resetRecord()
for i in range(0, maxattr):
offinc = self.currentrec.parseNumeric(buf[offset:],
maxlen - offset, i)
if offinc == 0:
Expand Down

0 comments on commit f42b805

Please sign in to comment.