Skip to content

Commit 2aa7353

Browse files
authored
Merge pull request #62 from aleks-v-k/hadoop_snappy_format_upstream_pr
Add hadoop snappy format support alongside with framing format
2 parents 5828797 + 9660132 commit 2aa7353

File tree

7 files changed

+584
-33
lines changed

7 files changed

+584
-33
lines changed

snappy/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,8 @@
99
UncompressError,
1010
isValidCompressed,
1111
)
12+
13+
from .hadoop_snappy import (
14+
stream_compress as hadoop_stream_compress,
15+
stream_decompress as hadoop_stream_decompress,
16+
)

snappy/__main__.py

Lines changed: 76 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,88 @@
1-
from .snappy import stream_compress, stream_decompress
1+
import argparse
2+
import io
3+
import sys
4+
5+
from . import snappy_formats as formats
6+
from .snappy import UncompressError
7+
28

39
def cmdline_main():
410
"""This method is what is run when invoking snappy via the commandline.
511
Try python -m snappy --help
612
"""
7-
import sys
8-
if (len(sys.argv) < 2 or len(sys.argv) > 4 or "--help" in sys.argv or
9-
"-h" in sys.argv or sys.argv[1] not in ("-c", "-d")):
10-
print("Usage: python -m snappy <-c/-d> [src [dst]]")
11-
print(" -c compress")
12-
print(" -d decompress")
13-
print("output is stdout if dst is omitted or '-'")
14-
print("input is stdin if src and dst are omitted or src is '-'.")
15-
sys.exit(1)
16-
17-
if len(sys.argv) >= 4 and sys.argv[3] != "-":
18-
dst = open(sys.argv[3], "wb")
19-
elif hasattr(sys.stdout, 'buffer'):
20-
dst = sys.stdout.buffer
21-
else:
22-
dst = sys.stdout
13+
stdin = sys.stdin
14+
if hasattr(sys.stdin, "buffer"):
15+
stdin = sys.stdin.buffer
16+
stdout = sys.stdout
17+
if hasattr(sys.stdout, "buffer"):
18+
stdout = sys.stdout.buffer
2319

24-
if len(sys.argv) >= 3 and sys.argv[2] != "-":
25-
src = open(sys.argv[2], "rb")
26-
elif hasattr(sys.stdin, "buffer"):
27-
src = sys.stdin.buffer
28-
else:
29-
src = sys.stdin
20+
parser = argparse.ArgumentParser(
21+
description="Compress or decompress snappy archive"
22+
)
23+
24+
group = parser.add_mutually_exclusive_group(required=True)
25+
26+
group.add_argument(
27+
'-c',
28+
dest='compress',
29+
action='store_true',
30+
help='Compress'
31+
)
32+
group.add_argument(
33+
'-d',
34+
dest='decompress',
35+
action='store_true',
36+
help='Decompress'
37+
)
38+
39+
parser.add_argument(
40+
'-t',
41+
dest='target_format',
42+
default=formats.DEFAULT_FORMAT,
43+
choices=formats.ALL_SUPPORTED_FORMATS,
44+
help=(
45+
'Target format, default is "{}"'.format(formats.DEFAULT_FORMAT)
46+
)
47+
)
48+
49+
parser.add_argument(
50+
'infile',
51+
nargs='?',
52+
type=argparse.FileType(mode='rb'),
53+
default=stdin,
54+
help="Input file (or stdin)"
55+
)
56+
parser.add_argument(
57+
'outfile',
58+
nargs='?',
59+
type=argparse.FileType(mode='wb'),
60+
default=stdout,
61+
help="Output file (or stdout)"
62+
)
63+
64+
args = parser.parse_args()
65+
66+
# workaround for https://bugs.python.org/issue14156
67+
if isinstance(args.infile, io.TextIOWrapper):
68+
args.infile = stdin
69+
if isinstance(args.outfile, io.TextIOWrapper):
70+
args.outfile = stdout
3071

31-
if sys.argv[1] == "-c":
32-
method = stream_compress
72+
additional_args = {}
73+
if args.compress:
74+
method = formats.get_compress_function(args.target_format)
3375
else:
34-
method = stream_decompress
76+
try:
77+
method, read_chunk = formats.get_decompress_function(
78+
args.target_format,
79+
args.infile
80+
)
81+
except UncompressError as err:
82+
sys.exit("Failed to get decompress function: {}".format(err))
83+
additional_args['start_chunk'] = read_chunk
3584

36-
method(src, dst)
85+
method(args.infile, args.outfile, **additional_args)
3786

3887

3988
if __name__ == "__main__":

snappy/hadoop_snappy.py

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
"""The module implements compression/decompression with snappy using
2+
Hadoop snappy format: https://github.com/kubo/snzip#hadoop-snappy-format
3+
4+
Expected usage like:
5+
6+
import snappy
7+
8+
src = 'uncompressed'
9+
dst = 'compressed'
10+
dst2 = 'decompressed'
11+
12+
with open(src, 'rb') as fin, open(dst, 'wb') as fout:
13+
snappy.hadoop_stream_compress(src, dst)
14+
15+
with open(dst, 'rb') as fin, open(dst2, 'wb') as fout:
16+
snappy.hadoop_stream_decompress(fin, fout)
17+
18+
with open(src, 'rb') as fin1, open(dst2, 'rb') as fin2:
19+
assert fin1.read() == fin2.read()
20+
21+
"""
22+
23+
import struct
24+
25+
from .snappy import (
26+
_compress, _uncompress,
27+
stream_compress as _stream_compress,
28+
stream_decompress as _stream_decompress,
29+
check_format as _check_format,
30+
UncompressError,
31+
_CHUNK_MAX)
32+
33+
34+
SNAPPY_BUFFER_SIZE_DEFAULT = 256 * 1024
35+
_STREAM_TO_STREAM_BLOCK_SIZE = _CHUNK_MAX
36+
37+
_INT_SIZE = 4
38+
39+
40+
def pack_int(num):
41+
big_endian_uint = struct.pack('>I', num)
42+
return big_endian_uint
43+
44+
45+
def unpack_int(data):
46+
return struct.unpack('>I', data)[0]
47+
48+
49+
class StreamCompressor(object):
50+
51+
"""This class implements the compressor-side of the hadoop snappy
52+
format, taken from https://github.com/kubo/snzip#hadoop-snappy-format
53+
54+
Keep in mind that this compressor object does no buffering for you to
55+
appropriately size chunks. Every call to StreamCompressor.compress results
56+
in a unique call to the underlying snappy compression method.
57+
"""
58+
59+
def __init__(self):
60+
pass
61+
62+
def add_chunk(self, data):
63+
"""Add a chunk containing 'data', returning a string that is
64+
compressed. This data should be concatenated to
65+
the tail end of an existing Snappy stream. In the absence of any
66+
internal buffering, no data is left in any internal buffers, and so
67+
unlike zlib.compress, this method returns everything.
68+
"""
69+
out = []
70+
uncompressed_length = len(data)
71+
out.append(pack_int(uncompressed_length))
72+
compressed_chunk = _compress(data)
73+
compressed_length = len(compressed_chunk)
74+
out.append(pack_int(compressed_length))
75+
out.append(compressed_chunk)
76+
return b"".join(out)
77+
78+
def compress(self, data):
79+
"""This method is simply an alias for compatibility with zlib
80+
compressobj's compress method.
81+
"""
82+
return self.add_chunk(data)
83+
84+
def flush(self, mode=None):
85+
"""This method does nothing and only exists for compatibility with
86+
the zlib compressobj
87+
"""
88+
pass
89+
90+
def copy(self):
91+
"""This method exists for compatibility with the zlib compressobj.
92+
"""
93+
return StreamCompressor()
94+
95+
96+
class StreamDecompressor(object):
97+
98+
"""This class implements the decompressor-side of the hadoop snappy
99+
format.
100+
101+
This class matches a subset of the interface found for the zlib module's
102+
decompression objects (see zlib.decompressobj). Specifically, it currently
103+
implements the decompress method without the max_length option, the flush
104+
method without the length option, and the copy method.
105+
"""
106+
107+
__slots__ = ["_buf", "_block_length", "_uncompressed_length"]
108+
109+
def __init__(self):
110+
self._buf = b""
111+
# current block length
112+
self._block_length = 0
113+
# total uncompressed data length of the current block
114+
self._uncompressed_length = 0
115+
116+
@staticmethod
117+
def check_format(data):
118+
"""Just checks that first two integers (big endian four-bytes int)
119+
in the given data block comply to: first int >= second int.
120+
This is a simple assumption that we have in the data a start of a
121+
block for hadoop snappy format. It should contain uncompressed block
122+
length as the first integer, and compressed subblock length as the
123+
second integer.
124+
Raises UncompressError if the condition is not fulfilled.
125+
:return: None
126+
"""
127+
int_size = _INT_SIZE
128+
if len(data) < int_size * 2:
129+
raise UncompressError("Too short data length")
130+
# We cant actually be sure abot the format here.
131+
# Assumption that compressed data length is less than uncompressed
132+
# is not true in general.
133+
# So, just don't check anything
134+
return
135+
136+
def decompress(self, data):
137+
"""Decompress 'data', returning a string containing the uncompressed
138+
data corresponding to at least part of the data in string. This data
139+
should be concatenated to the output produced by any preceding calls to
140+
the decompress() method. Some of the input data may be preserved in
141+
internal buffers for later processing.
142+
"""
143+
int_size = _INT_SIZE
144+
self._buf += data
145+
uncompressed = []
146+
while True:
147+
if len(self._buf) < int_size:
148+
return b"".join(uncompressed)
149+
next_start = 0
150+
if not self._block_length:
151+
self._block_length = unpack_int(self._buf[:int_size])
152+
self._buf = self._buf[int_size:]
153+
if len(self._buf) < int_size:
154+
return b"".join(uncompressed)
155+
compressed_length = unpack_int(
156+
self._buf[next_start:next_start + int_size]
157+
)
158+
next_start += int_size
159+
if len(self._buf) < compressed_length + next_start:
160+
return b"".join(uncompressed)
161+
chunk = self._buf[
162+
next_start:next_start + compressed_length
163+
]
164+
self._buf = self._buf[next_start + compressed_length:]
165+
uncompressed_chunk = _uncompress(chunk)
166+
self._uncompressed_length += len(uncompressed_chunk)
167+
uncompressed.append(uncompressed_chunk)
168+
if self._uncompressed_length == self._block_length:
169+
# Here we have uncompressed all subblocks of the current block
170+
self._uncompressed_length = 0
171+
self._block_length = 0
172+
continue
173+
174+
def flush(self):
175+
"""All pending input is processed, and a string containing the
176+
remaining uncompressed output is returned. After calling flush(), the
177+
decompress() method cannot be called again; the only realistic action
178+
is to delete the object.
179+
"""
180+
if self._buf != b"":
181+
raise UncompressError("chunk truncated")
182+
return b""
183+
184+
def copy(self):
185+
"""Returns a copy of the decompression object. This can be used to save
186+
the state of the decompressor midway through the data stream in order
187+
to speed up random seeks into the stream at a future point.
188+
"""
189+
copy = StreamDecompressor()
190+
copy._buf = self._buf
191+
copy._block_length = self._block_length
192+
copy._uncompressed_length = self._uncompressed_length
193+
return copy
194+
195+
196+
def stream_compress(src, dst, blocksize=SNAPPY_BUFFER_SIZE_DEFAULT):
197+
return _stream_compress(
198+
src, dst, blocksize=blocksize, compressor_cls=StreamCompressor
199+
)
200+
201+
202+
def stream_decompress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE,
203+
start_chunk=None):
204+
return _stream_decompress(
205+
src, dst, blocksize=blocksize,
206+
decompressor_cls=StreamDecompressor,
207+
start_chunk=start_chunk
208+
)
209+
210+
211+
def check_format(fin=None, chunk=None, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE):
212+
return _check_format(
213+
fin=fin, chunk=chunk, blocksize=blocksize,
214+
decompressor_cls=StreamDecompressor
215+
)

0 commit comments

Comments
 (0)