Skip to content

Commit

Permalink
Add support for reading flowtuple4 files
Browse files Browse the repository at this point in the history
Still probably some room for performance improvements, especially
with how we handle the "common" value arrays but the core
functionality seems to be in place.
  • Loading branch information
salcock committed Apr 20, 2021
1 parent d9188a4 commit 0fee787
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 18 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ install-from-source: dist

clean:
$(RM) -r build dist src/*.egg-info
$(RM) -r src/{pyavrostardust.c}
$(RM) -r src/pyavro_stardust/*.c src/pyavro_stardust/*.cpp
$(RM) -r src/pyavro_stardust/*.html
$(RM) -r .pytest_cache
find . -name __pycache__ -exec rm -r {} +
#git clean -fdX
Expand Down
72 changes: 72 additions & 0 deletions examples/flowtuple4-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Example code that uses the AvroFlowtuple3Reader extension class to
# count flowtuples via a perFlowtuple callback method

import sys
from collections import defaultdict
from pyavro_stardust.flowtuple4 import AvroFlowtuple4Reader, \
Flowtuple4AttributeNum, Flowtuple4AttributeStr, \
Flowtuple4AttributeNumArray

counter = 0
protocols = defaultdict(int)

ttls = defaultdict(int)
flags = defaultdict(int)
sizes = defaultdict(int)

# Incredibly simple callback that simply increments a global counter for
# each flowtuple, as well as tracking the number of packets for each
# IP protocols
def perFlowtupleCallback(ft, userarg):
global counter, protocols
counter += 1

a = ft.asDict(True)
proto = a["protocol"]
pktcnt = a["packets"]

protocols[proto] += pktcnt

if "common_ttls" in a:
for t in a["common_ttls"]:
ttls[t['value']] += t['freq']

if "common_pkt_sizes" in a:
for s in a["common_pkt_sizes"]:
sizes[s['value']] += s['freq']

if "common_tcp_flags" in a:
for f in a["common_tcp_flags"]:
flags[f['value']] += f['freq']

def run():

# sys.argv[1] must be a valid wandio path -- e.g. a swift URL or
# a path to a file on disk
ftreader = AvroFlowtuple4Reader(sys.argv[1])
ftreader.start()

# This will read all flowtuples and call `perFlowtupleCallback` on
# each one
ftreader.perAvroRecord(perFlowtupleCallback)

ftreader.close()

# Display our final result
print("Total flowtuples:", counter)
for k,v in protocols.items():
print("Protocol", k, ":", v, "packets")

print()
for k,v in ttls.items():
print("TTL", k, ":", v, "packets")

print()
for k,v in sizes.items():
print("Packet Size", k, ":", v, "packets")

print()
for k,v in flags.items():
print("TCP Flags", k, ":", v, "packets")

run()
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ def no_cythonize(extensions, **_ignore):


extensions = [
Extension('pyavro_stardust.baseavro', ['src/pyavro_stardust/baseavro.pyx']),
Extension('pyavro_stardust.flowtuple3', ['src/pyavro_stardust/flowtuple3.pyx']),
Extension('pyavro_stardust.baseavro', ['src/pyavro_stardust/baseavro.pyx'], language="c++"),
Extension('pyavro_stardust.flowtuple3', ['src/pyavro_stardust/flowtuple3.pyx'], language="c++"),
Extension('pyavro_stardust.flowtuple4', ['src/pyavro_stardust/flowtuple4.pyx'], language="c++"),
#Extension('pyavro_stardust.rsdos', ['src/pyavro_stardust/rsdos.pyx'])
]

CYTHONIZE = bool(int(os.getenv("CYTHONIZE", 0))) and cythonize is not None

if CYTHONIZE:
compiler_directives = {"language_level": 3, "embedsignature": True}
extensions = cythonize(extensions, compiler_directives=compiler_directives)
extensions = cythonize(extensions, compiler_directives=compiler_directives, annotate=True)
else:
extensions = no_cythonize(extensions)

Expand Down
19 changes: 17 additions & 2 deletions src/pyavro_stardust/baseavro.pxd
Original file line number Diff line number Diff line change
@@ -1,27 +1,42 @@
from libcpp.vector cimport vector

cdef struct parsedString:
int toskip
int strlen
unsigned char *start

cdef struct parsedNumericArrayBlock:
int totalsize
int blockcount
long *values


cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen)
cdef parsedString read_string(const unsigned char[:] buf, const int maxlen)
cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf,
const int maxlen)

cdef class AvroRecord:

cdef long *attributes_l
cdef char **attributes_s
cdef long **attributes_na
cdef long *attributes_na_sizes
cdef unsigned int sizeinbuf
cdef int stringcount;
cdef int numcount;
cdef int stringcount
cdef int numcount
cdef int numarraycount

cdef int parseNumeric(self, const unsigned char[:] buf, const int maxlen,
int attrind)
cpdef long getNumeric(self, int attrind)
cpdef str getString(self, int attrind)
cpdef unsigned int getRecordSizeInBuffer(self)
cdef int parseNumericArray(self, const unsigned char[:] buf,
const int maxlen, int attrind)
cdef int parseString(self, const unsigned char[:] buf, const int maxlen,
int attrind)
cpdef vector[long] getNumericArray(self, int attrind)
cpdef void resetRecord(self)


Expand Down
138 changes: 126 additions & 12 deletions src/pyavro_stardust/baseavro.pyx
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# cython: language_level=3
from libc.string cimport memcpy
from cpython.mem cimport PyMem_Malloc, PyMem_Free
import zlib, wandio
from libcpp.vector cimport vector
from cpython.mem cimport PyMem_Malloc, PyMem_Free, PyMem_Realloc
import zlib, wandio, sys
cimport cython

cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen):
Expand Down Expand Up @@ -44,14 +45,47 @@ cdef parsedString read_string(const unsigned char[:] buf, const int maxlen):
s.start = <unsigned char *>&(buf[skip])
return s

cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf,
const int maxlen):
cdef int skip
cdef long arrayitem, blockcount
cdef parsedNumericArrayBlock arr

skip, blockcount = read_long(buf, maxlen)
if skip == 0:
arr.totalsize = 0
arr.blockcount = 0
arr.values = NULL
return arr

arr.totalsize = skip
arr.blockcount = 0

if blockcount == 0:
arr.values = NULL
return arr

arr.values = <long *>PyMem_Malloc(sizeof(long) * arr.blockcount)

for i in range(blockcount):
skip, arrayitem = read_long(buf[arr.totalsize:], maxlen - arr.totalsize)
if skip == 0:
break
arr.totalsize += skip
arr.values[arr.blockcount] = arrayitem
arr.blockcount += 1

return arr

cdef class AvroRecord:

def __cinit__(self):
self.attributes_l = NULL
self.attributes_s = NULL
self.attributes_na = NULL
self.attributes_na_sizes = NULL;

def __init__(self, numeric, strings):
def __init__(self, numeric, strings, numarrays):
if (numeric > 0):
self.attributes_l = <long *>PyMem_Malloc(sizeof(long) * numeric)
for i in range(numeric):
Expand All @@ -61,9 +95,20 @@ cdef class AvroRecord:
self.attributes_s = <char **>PyMem_Malloc(sizeof(char *) * strings)
for i in range(strings):
self.attributes_s[i] = NULL

if (numarrays > 0):
self.attributes_na = <long **>PyMem_Malloc(sizeof(long **) *
numarrays)
self.attributes_na_sizes = <long *>PyMem_Malloc(sizeof(long) *
numarrays)
for i in range(numarrays):
self.attributes_na[i] = NULL
self.attributes_na_sizes[i] = 0

self.sizeinbuf = 0
self.stringcount = strings
self.numcount = numeric
self.numarraycount = numarrays

def __dealloc__(self):
if self.attributes_s != NULL:
Expand All @@ -72,6 +117,16 @@ cdef class AvroRecord:
PyMem_Free(self.attributes_s[i])
PyMem_Free(self.attributes_s)


if self.attributes_na != NULL:
for i in range(self.numarraycount):
if self.attributes_na[i] != NULL:
PyMem_Free(self.attributes_na[i])
PyMem_Free(self.attributes_na)

if self.attributes_na_sizes != NULL:
PyMem_Free(self.attributes_na_sizes)

if self.attributes_l != NULL:
PyMem_Free(self.attributes_l)

Expand All @@ -98,6 +153,14 @@ cdef class AvroRecord:
cpdef unsigned int getRecordSizeInBuffer(self):
return self.sizeinbuf

cpdef vector[long] getNumericArray(self, int attrind):
cdef int i
cdef vector[long] vec

for i in range(self.attributes_na_sizes[attrind]):
vec.push_back(self.attributes_na[<int>attrind][i])
return vec

cdef int parseString(self, const unsigned char[:] buf, const int maxlen,
int attrind):

Expand All @@ -116,16 +179,59 @@ cdef class AvroRecord:

return astr.toskip + astr.strlen

cdef int parseNumericArray(self, const unsigned char[:] buf,
const int maxlen, int attrind):

cdef parsedNumericArrayBlock block
cdef int toskip, i

toskip = 0
while toskip < maxlen:
block = read_numeric_array(buf[toskip:], maxlen - toskip)

if block.blockcount == 0:
toskip += block.totalsize
break

if self.attributes_na[attrind] == NULL:
self.attributes_na[attrind] = block.values
self.attributes_na_sizes[attrind] = block.blockcount
else:
# XXX This code path is untested due to generally not being
# used at all in practice
newsize = block.blockcount + self.attributes_na_sizes[attrind]
newmem = <long *>PyMem_Realloc(self.attributes_na[attrind],
newsize * sizeof(long))
for i in range(block.blockcount):
newmem[self.attributes_na_sizes[attrind] + i] = block.values[i]
self.attributes_na_sizes[attrind] += block.blockcount
self.attributes_na[attrind] = newmem
PyMem_Free(block.values)


toskip += block.totalsize

self.sizeinbuf += toskip
return toskip


cpdef void resetRecord(self):
cdef int i

self.sizeinbuf = 0

if self.attributes_s == NULL:
return
if self.attributes_s != NULL:
for i in range(self.stringcount):
if self.attributes_s[i] != NULL:
PyMem_Free(self.attributes_s[i])
self.attributes_s[i] = NULL

for i in range(self.stringcount):
if self.attributes_s[i] != NULL:
PyMem_Free(self.attributes_s[i])
self.attributes_s[i] = NULL
if self.attributes_na != NULL:
for i in range(self.numarraycount):
if self.attributes_na[i] != NULL:
PyMem_Free(self.attributes_na[i])
self.attributes_na_sizes[i] = 0
self.attributes_na[i] = NULL


cdef class AvroReader:
Expand All @@ -140,7 +246,14 @@ cdef class AvroReader:
self.currentrec = None

def _readMore(self):
inread = self.fh.read(1024 * 1024)
try:
inread = self.fh.read(1024 * 1024)
except UnicodeDecodeError as e:
print(e)
return -1
except KeyboardInterrupt:
return -1

if len(inread) > 0 and inread != '':
self.bufrin += inread
return 1
Expand All @@ -152,7 +265,8 @@ cdef class AvroReader:
cdef long array_size, keylen, vallen

if len(self.bufrin) < 32:
self._readMore()
if self._readMore() < 0:
sys.exit(1)

if len(self.bufrin) < 32:
return
Expand Down Expand Up @@ -189,7 +303,7 @@ cdef class AvroReader:
if self.fh is not None:
return
try:
self.fh = wandio.open(self.filepath)
self.fh = wandio.open(self.filepath, 'rb')
except:
raise

Expand Down
Loading

0 comments on commit 0fee787

Please sign in to comment.