Skip to content

Commit

Permalink
Merge pull request #1 from CAIDA/flowtuple4
Browse files Browse the repository at this point in the history
Add support for reading flowtuple4 files
  • Loading branch information
salcock authored Apr 20, 2021
2 parents d9188a4 + 0fee787 commit 61d4782
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 18 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ install-from-source: dist

clean:
$(RM) -r build dist src/*.egg-info
$(RM) -r src/{pyavrostardust.c}
$(RM) -r src/pyavro_stardust/*.c src/pyavro_stardust/*.cpp
$(RM) -r src/pyavro_stardust/*.html
$(RM) -r .pytest_cache
find . -name __pycache__ -exec rm -r {} +
#git clean -fdX
Expand Down
72 changes: 72 additions & 0 deletions examples/flowtuple4-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Example code that uses the AvroFlowtuple3Reader extension class to
# count flowtuples via a perFlowtuple callback method

import sys
from collections import defaultdict
from pyavro_stardust.flowtuple4 import AvroFlowtuple4Reader, \
Flowtuple4AttributeNum, Flowtuple4AttributeStr, \
Flowtuple4AttributeNumArray

counter = 0
protocols = defaultdict(int)

ttls = defaultdict(int)
flags = defaultdict(int)
sizes = defaultdict(int)

# Incredibly simple callback that simply increments a global counter for
# each flowtuple, as well as tracking the number of packets for each
# IP protocols
def perFlowtupleCallback(ft, userarg):
global counter, protocols
counter += 1

a = ft.asDict(True)
proto = a["protocol"]
pktcnt = a["packets"]

protocols[proto] += pktcnt

if "common_ttls" in a:
for t in a["common_ttls"]:
ttls[t['value']] += t['freq']

if "common_pkt_sizes" in a:
for s in a["common_pkt_sizes"]:
sizes[s['value']] += s['freq']

if "common_tcp_flags" in a:
for f in a["common_tcp_flags"]:
flags[f['value']] += f['freq']

def run():

# sys.argv[1] must be a valid wandio path -- e.g. a swift URL or
# a path to a file on disk
ftreader = AvroFlowtuple4Reader(sys.argv[1])
ftreader.start()

# This will read all flowtuples and call `perFlowtupleCallback` on
# each one
ftreader.perAvroRecord(perFlowtupleCallback)

ftreader.close()

# Display our final result
print("Total flowtuples:", counter)
for k,v in protocols.items():
print("Protocol", k, ":", v, "packets")

print()
for k,v in ttls.items():
print("TTL", k, ":", v, "packets")

print()
for k,v in sizes.items():
print("Packet Size", k, ":", v, "packets")

print()
for k,v in flags.items():
print("TCP Flags", k, ":", v, "packets")

run()
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ def no_cythonize(extensions, **_ignore):


extensions = [
Extension('pyavro_stardust.baseavro', ['src/pyavro_stardust/baseavro.pyx']),
Extension('pyavro_stardust.flowtuple3', ['src/pyavro_stardust/flowtuple3.pyx']),
Extension('pyavro_stardust.baseavro', ['src/pyavro_stardust/baseavro.pyx'], language="c++"),
Extension('pyavro_stardust.flowtuple3', ['src/pyavro_stardust/flowtuple3.pyx'], language="c++"),
Extension('pyavro_stardust.flowtuple4', ['src/pyavro_stardust/flowtuple4.pyx'], language="c++"),
#Extension('pyavro_stardust.rsdos', ['src/pyavro_stardust/rsdos.pyx'])
]

CYTHONIZE = bool(int(os.getenv("CYTHONIZE", 0))) and cythonize is not None

if CYTHONIZE:
compiler_directives = {"language_level": 3, "embedsignature": True}
extensions = cythonize(extensions, compiler_directives=compiler_directives)
extensions = cythonize(extensions, compiler_directives=compiler_directives, annotate=True)
else:
extensions = no_cythonize(extensions)

Expand Down
19 changes: 17 additions & 2 deletions src/pyavro_stardust/baseavro.pxd
Original file line number Diff line number Diff line change
@@ -1,27 +1,42 @@
from libcpp.vector cimport vector

cdef struct parsedString:
int toskip
int strlen
unsigned char *start

cdef struct parsedNumericArrayBlock:
int totalsize
int blockcount
long *values


cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen)
cdef parsedString read_string(const unsigned char[:] buf, const int maxlen)
cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf,
const int maxlen)

cdef class AvroRecord:

cdef long *attributes_l
cdef char **attributes_s
cdef long **attributes_na
cdef long *attributes_na_sizes
cdef unsigned int sizeinbuf
cdef int stringcount;
cdef int numcount;
cdef int stringcount
cdef int numcount
cdef int numarraycount

cdef int parseNumeric(self, const unsigned char[:] buf, const int maxlen,
int attrind)
cpdef long getNumeric(self, int attrind)
cpdef str getString(self, int attrind)
cpdef unsigned int getRecordSizeInBuffer(self)
cdef int parseNumericArray(self, const unsigned char[:] buf,
const int maxlen, int attrind)
cdef int parseString(self, const unsigned char[:] buf, const int maxlen,
int attrind)
cpdef vector[long] getNumericArray(self, int attrind)
cpdef void resetRecord(self)


Expand Down
138 changes: 126 additions & 12 deletions src/pyavro_stardust/baseavro.pyx
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# cython: language_level=3
from libc.string cimport memcpy
from cpython.mem cimport PyMem_Malloc, PyMem_Free
import zlib, wandio
from libcpp.vector cimport vector
from cpython.mem cimport PyMem_Malloc, PyMem_Free, PyMem_Realloc
import zlib, wandio, sys
cimport cython

cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen):
Expand Down Expand Up @@ -44,14 +45,47 @@ cdef parsedString read_string(const unsigned char[:] buf, const int maxlen):
s.start = <unsigned char *>&(buf[skip])
return s

cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf,
const int maxlen):
cdef int skip
cdef long arrayitem, blockcount
cdef parsedNumericArrayBlock arr

skip, blockcount = read_long(buf, maxlen)
if skip == 0:
arr.totalsize = 0
arr.blockcount = 0
arr.values = NULL
return arr

arr.totalsize = skip
arr.blockcount = 0

if blockcount == 0:
arr.values = NULL
return arr

arr.values = <long *>PyMem_Malloc(sizeof(long) * arr.blockcount)

for i in range(blockcount):
skip, arrayitem = read_long(buf[arr.totalsize:], maxlen - arr.totalsize)
if skip == 0:
break
arr.totalsize += skip
arr.values[arr.blockcount] = arrayitem
arr.blockcount += 1

return arr

cdef class AvroRecord:

def __cinit__(self):
self.attributes_l = NULL
self.attributes_s = NULL
self.attributes_na = NULL
self.attributes_na_sizes = NULL;

def __init__(self, numeric, strings):
def __init__(self, numeric, strings, numarrays):
if (numeric > 0):
self.attributes_l = <long *>PyMem_Malloc(sizeof(long) * numeric)
for i in range(numeric):
Expand All @@ -61,9 +95,20 @@ cdef class AvroRecord:
self.attributes_s = <char **>PyMem_Malloc(sizeof(char *) * strings)
for i in range(strings):
self.attributes_s[i] = NULL

if (numarrays > 0):
self.attributes_na = <long **>PyMem_Malloc(sizeof(long **) *
numarrays)
self.attributes_na_sizes = <long *>PyMem_Malloc(sizeof(long) *
numarrays)
for i in range(numarrays):
self.attributes_na[i] = NULL
self.attributes_na_sizes[i] = 0

self.sizeinbuf = 0
self.stringcount = strings
self.numcount = numeric
self.numarraycount = numarrays

def __dealloc__(self):
if self.attributes_s != NULL:
Expand All @@ -72,6 +117,16 @@ cdef class AvroRecord:
PyMem_Free(self.attributes_s[i])
PyMem_Free(self.attributes_s)


if self.attributes_na != NULL:
for i in range(self.numarraycount):
if self.attributes_na[i] != NULL:
PyMem_Free(self.attributes_na[i])
PyMem_Free(self.attributes_na)

if self.attributes_na_sizes != NULL:
PyMem_Free(self.attributes_na_sizes)

if self.attributes_l != NULL:
PyMem_Free(self.attributes_l)

Expand All @@ -98,6 +153,14 @@ cdef class AvroRecord:
cpdef unsigned int getRecordSizeInBuffer(self):
return self.sizeinbuf

cpdef vector[long] getNumericArray(self, int attrind):
cdef int i
cdef vector[long] vec

for i in range(self.attributes_na_sizes[attrind]):
vec.push_back(self.attributes_na[<int>attrind][i])
return vec

cdef int parseString(self, const unsigned char[:] buf, const int maxlen,
int attrind):

Expand All @@ -116,16 +179,59 @@ cdef class AvroRecord:

return astr.toskip + astr.strlen

cdef int parseNumericArray(self, const unsigned char[:] buf,
const int maxlen, int attrind):

cdef parsedNumericArrayBlock block
cdef int toskip, i

toskip = 0
while toskip < maxlen:
block = read_numeric_array(buf[toskip:], maxlen - toskip)

if block.blockcount == 0:
toskip += block.totalsize
break

if self.attributes_na[attrind] == NULL:
self.attributes_na[attrind] = block.values
self.attributes_na_sizes[attrind] = block.blockcount
else:
# XXX This code path is untested due to generally not being
# used at all in practice
newsize = block.blockcount + self.attributes_na_sizes[attrind]
newmem = <long *>PyMem_Realloc(self.attributes_na[attrind],
newsize * sizeof(long))
for i in range(block.blockcount):
newmem[self.attributes_na_sizes[attrind] + i] = block.values[i]
self.attributes_na_sizes[attrind] += block.blockcount
self.attributes_na[attrind] = newmem
PyMem_Free(block.values)


toskip += block.totalsize

self.sizeinbuf += toskip
return toskip


cpdef void resetRecord(self):
cdef int i

self.sizeinbuf = 0

if self.attributes_s == NULL:
return
if self.attributes_s != NULL:
for i in range(self.stringcount):
if self.attributes_s[i] != NULL:
PyMem_Free(self.attributes_s[i])
self.attributes_s[i] = NULL

for i in range(self.stringcount):
if self.attributes_s[i] != NULL:
PyMem_Free(self.attributes_s[i])
self.attributes_s[i] = NULL
if self.attributes_na != NULL:
for i in range(self.numarraycount):
if self.attributes_na[i] != NULL:
PyMem_Free(self.attributes_na[i])
self.attributes_na_sizes[i] = 0
self.attributes_na[i] = NULL


cdef class AvroReader:
Expand All @@ -140,7 +246,14 @@ cdef class AvroReader:
self.currentrec = None

def _readMore(self):
inread = self.fh.read(1024 * 1024)
try:
inread = self.fh.read(1024 * 1024)
except UnicodeDecodeError as e:
print(e)
return -1
except KeyboardInterrupt:
return -1

if len(inread) > 0 and inread != '':
self.bufrin += inread
return 1
Expand All @@ -152,7 +265,8 @@ cdef class AvroReader:
cdef long array_size, keylen, vallen

if len(self.bufrin) < 32:
self._readMore()
if self._readMore() < 0:
sys.exit(1)

if len(self.bufrin) < 32:
return
Expand Down Expand Up @@ -189,7 +303,7 @@ cdef class AvroReader:
if self.fh is not None:
return
try:
self.fh = wandio.open(self.filepath)
self.fh = wandio.open(self.filepath, 'rb')
except:
raise

Expand Down
Loading

0 comments on commit 61d4782

Please sign in to comment.