Skip to content

Commit

Permalink
Separate common code into an AvroReader class
Browse files Browse the repository at this point in the history
AvroFlowtupleReader now inherits from this class and only
implements the flowtuple-specific Avro parsing logic
needed to populate an AvroFlowtuple instance.

There's possibly still room to optimise this code -- the
extra OO-ness is likely adding extra overhead that we may
be able to reduce by being smarter. Good enough for a first
pass though.
  • Loading branch information
salcock committed Feb 25, 2021
1 parent 289b8f0 commit cedc051
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 169 deletions.
2 changes: 1 addition & 1 deletion examples/flowtuple-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def run():

# This will read all flowtuples and call `perFlowtupleCallback` on
# each one
ftreader.perFlowtuple(perFlowtupleCallback)
ftreader.perAvroRecord(perFlowtupleCallback)

ftreader.close()

Expand Down
15 changes: 15 additions & 0 deletions src/pyavro_stardust/baseavro.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,19 @@ cdef class AvroRecord:
cpdef void resetRecord(self)


cdef class AvroReader:
cdef unsigned int nextblock
cdef unsigned int unzip_offset
cdef fh
cdef str filepath
cdef bytearray syncmarker
cdef bytearray bufrin
cdef bytes unzipped
cdef AvroRecord currentrec

cpdef void _readAvroFileHeader(self)
cdef int _parseNextRecord(self, const unsigned char[:] buf,
const int maxlen)
cdef AvroRecord _getNextRecord(self)

# vim: set sw=4 tabstop=4 softtabstop=4 expandtab :
151 changes: 151 additions & 0 deletions src/pyavro_stardust/baseavro.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,156 @@ cdef class AvroRecord:
self.attributes_s[i] = NULL


cdef class AvroReader:
def __init__(self, filepath):
self.filepath = filepath
self.syncmarker = None
self.fh = None
self.bufrin = bytearray()
self.nextblock = 0
self.unzipped = None
self.unzip_offset = 0
self.currentrec = None

def _readMore(self):
inread = self.fh.read(1024 * 1024)
if len(inread) > 0 and inread != '':
self.bufrin += inread
return 1
return 0

cpdef void _readAvroFileHeader(self):
cdef unsigned int offset, fullsize
cdef int offinc, i
cdef long array_size, keylen, vallen

if len(self.bufrin) < 32:
self._readMore()

if len(self.bufrin) < 32:
return

offset = 4 + self.nextblock # skip past 'Obj\x01'
fullsize = len(self.bufrin)
offinc, array_size = read_long(self.bufrin[offset:], fullsize - offset)
if array_size is None:
return
offset += offinc

for i in range(0, array_size):
offinc, keylen = read_long(self.bufrin[offset:], fullsize - offset)
if keylen is None:
return
offset += (offinc + keylen)

offinc, vallen = read_long(self.bufrin[offset:], fullsize - offset)
if vallen is None:
return
offset += (offinc + vallen)

# skip past trailing zero size array
assert(self.bufrin[offset] == 0)
offset += 1

if fullsize - offset < 16:
return

self.syncmarker = bytearray(self.bufrin[offset: offset+16])
self.nextblock = offset + 16;

def start(self):
if self.fh is not None:
return
try:
self.fh = wandio.open(self.filepath)
except:
raise

if self.syncmarker is None:
self._readAvroFileHeader()

def close(self):
self.fh.close()

cdef int _parseNextRecord(self, const unsigned char[:] buf,
const int maxlen):
return 0

cdef AvroRecord _getNextRecord(self):
if self.unzipped is None:
return None

if self.unzip_offset >= len(self.unzipped):
return None

if self._parseNextRecord(self.unzipped[self.unzip_offset:],
len(self.unzipped) - self.unzip_offset) == 0:
return None

self.unzip_offset += self.currentrec.getRecordSizeInBuffer()
return self.currentrec

def perAvroRecord(self, func, userarg=None):
cdef unsigned int offset, fullsize
cdef int offinc
cdef long blockcnt, blocksize
cdef AvroRecord nextrec

while self.syncmarker is None:
self._readAvroFileHeader()

while 1:
offset = self.nextblock
fullsize = len(self.bufrin) - self.nextblock

offinc, blockcnt = read_long(self.bufrin[offset:],
fullsize - offset)
if offinc == 0:
if self._readMore() == 0:
break
continue
offset += offinc
offinc, blocksize = read_long(self.bufrin[offset:],
fullsize - offset)
if offinc == 0:
if self._readMore() == 0:
break
continue

offset += offinc

content = self.bufrin[offset: offset + blocksize]
if len(content) < blocksize or \
len(self.bufrin[offset + blocksize:]) < 16:
if self._readMore() == 0:
break
continue

try:
self.unzipped = zlib.decompress(content, -15)
self.unzip_offset = 0
except zlib.error:
return

nextrec = self._getNextRecord()
while nextrec is not None:
func(nextrec, userarg)
nextrec = self._getNextRecord()

offset += blocksize

assert(self.bufrin[offset: offset+16] == self.syncmarker)

self.nextblock = offset + 16

if self.nextblock >= len(self.bufrin):
if self._readMore() == 0:
break

self.bufrin = self.bufrin[self.nextblock:]
self.nextblock = 0




# vim: set sw=4 tabstop=4 softtabstop=4 expandtab :
21 changes: 3 additions & 18 deletions src/pyavro_stardust/flowtuple.pxd
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import cython
from pyavro_stardust.baseavro cimport AvroRecord
from pyavro_stardust.baseavro cimport AvroRecord, AvroReader

cpdef enum FlowtupleAttributeNum:
ATTR_FT_TIMESTAMP = 0
Expand All @@ -25,27 +25,12 @@ cpdef enum FlowtupleAttributeStr:
ATTR_FT_NETACQ_COUNTRY = 3


@cython.final
cdef class AvroFlowtuple(AvroRecord):

cpdef dict asDict(self)

@cython.final
cdef class AvroFlowtupleReader:

cdef unsigned int nextblock
cdef unsigned int unzip_offset
cdef fh
cdef str filepath
cdef bytearray syncmarker
cdef bytearray bufrin
cdef bytes unzipped
cdef AvroFlowtuple avroft

cpdef void _readAvroFileHeader(self)
cdef int _parseFlowtupleAvro(self, const unsigned char[:] buf,
cdef class AvroFlowtupleReader(AvroReader):
cdef int _parseNextRecord(self, const unsigned char[:] buf,
const int maxlen)
cdef AvroFlowtuple _getNextFlowtuple(self)


# vim: set sw=4 tabstop=4 softtabstop=4 expandtab :
Loading

0 comments on commit cedc051

Please sign in to comment.