Skip to content

Commit

Permalink
Move generic Avro code into baseavro.pyx
Browse files Browse the repository at this point in the history
This means that other modules, such as the upcoming rsdos reader,
don't need to duplicate a ton of code.

Changed flowtuple behaviour slightly to only maintain a single
AvroFlowtuple instance rather than instantiating a fresh instance
everytime we want to read from the file. This matters a lot more
now that the base AvroRecord object does mallocs and frees for
every instance, so it is much more efficient to just have one
instance and reuse it.
  • Loading branch information
salcock committed Feb 25, 2021
1 parent b854bdd commit 289b8f0
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 141 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def no_cythonize(extensions, **_ignore):


extensions = [
Extension('pyavro_stardust.baseavro', ['src/pyavro_stardust/baseavro.pyx']),
Extension('pyavro_stardust.flowtuple', ['src/pyavro_stardust/flowtuple.pyx'])
]

Expand Down
28 changes: 28 additions & 0 deletions src/pyavro_stardust/baseavro.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

cdef struct parsedString:
int toskip
int strlen
unsigned char *start

cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen)
cdef parsedString read_string(const unsigned char[:] buf, const int maxlen)

cdef class AvroRecord:

cdef long *attributes_l
cdef char **attributes_s
cdef unsigned int sizeinbuf
cdef int stringcount;
cdef int numcount;

cdef int parseNumeric(self, const unsigned char[:] buf, const int maxlen,
int attrind)
cpdef long getNumeric(self, int attrind)
cpdef str getString(self, int attrind)
cpdef unsigned int getRecordSizeInBuffer(self)
cdef int parseString(self, const unsigned char[:] buf, const int maxlen,
int attrind)
cpdef void resetRecord(self)


# vim: set sw=4 tabstop=4 softtabstop=4 expandtab :
130 changes: 130 additions & 0 deletions src/pyavro_stardust/baseavro.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# cython: language_level=3
from libc.string cimport memcpy
from cpython.mem cimport PyMem_Malloc, PyMem_Free
import zlib, wandio
cimport cython

cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen):
cdef int longlen = 0
cdef int shift
cdef unsigned long b
cdef unsigned long n

if maxlen == 0:
return 0,0

b = buf[0]
n = b & 0x7F
shift = 7

while (b & 0x80) != 0:
longlen += 1
if longlen >= maxlen:
return 0,0
b = buf[longlen]
n |= ((b & 0x7F) << shift)
shift += 7


return (longlen + 1, (n >> 1) ^ -(n & 1))

cdef parsedString read_string(const unsigned char[:] buf, const int maxlen):
cdef int skip, strlen
cdef parsedString s

skip,strlen = read_long(buf, maxlen)
if skip == 0:
s.toskip = 0
s.strlen = 0
s.start = NULL
return s

s.toskip = skip
s.strlen = strlen
s.start = <unsigned char *>&(buf[skip])
return s


cdef class AvroRecord:

def __cinit__(self):
self.attributes_l = NULL
self.attributes_s = NULL

def __init__(self, numeric, strings):
self.attributes_l = <long *>PyMem_Malloc(sizeof(long) * numeric)
self.attributes_s = <char **>PyMem_Malloc(sizeof(char *) * strings)

for i in range(numeric):
self.attributes_l[i] = 0
for i in range(strings):
self.attributes_s[i] = NULL
self.sizeinbuf = 0
self.stringcount = strings
self.numcount = numeric

def __dealloc__(self):
if self.attributes_s != NULL:
for i in range(self.stringcount):
if self.attributes_s[i] != NULL:
PyMem_Free(self.attributes_s[i])
PyMem_Free(self.attributes_s)

if self.attributes_l != NULL:
PyMem_Free(self.attributes_l)

cdef int parseNumeric(self, const unsigned char[:] buf, const int maxlen,
int attrind):
cdef int offinc
cdef long longval

offinc, longval = read_long(buf, maxlen)
if offinc == 0:
return -1

self.attributes_l[attrind] = longval
self.sizeinbuf += offinc

return offinc

cpdef long getNumeric(self, int attrind):
return self.attributes_l[<int>attrind]

cpdef str getString(self, int attrind):
return str(self.attributes_s[<int>attrind])

cpdef unsigned int getRecordSizeInBuffer(self):
return self.sizeinbuf

cdef int parseString(self, const unsigned char[:] buf, const int maxlen,
int attrind):

cdef parsedString astr

astr = read_string(buf, maxlen)

if astr.toskip == 0:
return 0

self.sizeinbuf += astr.toskip + astr.strlen
self.attributes_s[attrind] = <char *>PyMem_Malloc(sizeof(char) * astr.strlen + 1)

memcpy(self.attributes_s[attrind], astr.start, astr.strlen)
self.attributes_s[attrind][astr.strlen] = b'\x00'

return astr.toskip + astr.strlen

cpdef void resetRecord(self):
self.sizeinbuf = 0

if self.attributes_s == NULL:
return

for i in range(self.stringcount):
if self.attributes_s[i] != NULL:
PyMem_Free(self.attributes_s[i])
self.attributes_s[i] = NULL



# vim: set sw=4 tabstop=4 softtabstop=4 expandtab :
18 changes: 4 additions & 14 deletions src/pyavro_stardust/flowtuple.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import cython
from pyavro_stardust.baseavro cimport AvroRecord

cpdef enum FlowtupleAttributeNum:
ATTR_FT_TIMESTAMP = 0
Expand All @@ -25,21 +26,9 @@ cpdef enum FlowtupleAttributeStr:


@cython.final
cdef class AvroFlowtuple:
cdef class AvroFlowtuple(AvroRecord):

cdef unsigned int sizeinbuf
cdef long attributes_l[16]
cdef char *attributes_s[4]

cdef int parseNumeric(self, const unsigned char[:] buf, const int maxlen,
int attrind)
cpdef long getNumeric(self, int attrind)
cpdef str getString(self, int attrind)
cpdef unsigned int getFlowtupleSizeInBuffer(self)
cpdef dict asDict(self)
cdef int parseString(self, const unsigned char[:] buf, const int maxlen,
int attrind)
cpdef void releaseStrings(self)

@cython.final
cdef class AvroFlowtupleReader:
Expand All @@ -51,9 +40,10 @@ cdef class AvroFlowtupleReader:
cdef bytearray syncmarker
cdef bytearray bufrin
cdef bytes unzipped
cdef AvroFlowtuple avroft

cpdef void _readAvroFileHeader(self)
cdef AvroFlowtuple _parseFlowtupleAvro(self, const unsigned char[:] buf,
cdef int _parseFlowtupleAvro(self, const unsigned char[:] buf,
const int maxlen)
cdef AvroFlowtuple _getNextFlowtuple(self)

Expand Down
Loading

0 comments on commit 289b8f0

Please sign in to comment.