Skip to content

Commit 0262f89

Browse files
committed
Add hadoop snappy format support alongside with framing format
1 parent 5828797 commit 0262f89

File tree

5 files changed

+337
-30
lines changed

5 files changed

+337
-30
lines changed

snappy/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,8 @@
99
UncompressError,
1010
isValidCompressed,
1111
)
12+
13+
from .hadoop_snappy import (
14+
stream_compress as hadoop_stream_compress,
15+
stream_decompress as hadoop_stream_decompress,
16+
)

snappy/__main__.py

Lines changed: 83 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,96 @@
1+
import argparse
2+
import io
3+
import sys
4+
15
from .snappy import stream_compress, stream_decompress
6+
from .hadoop_snappy import (
7+
stream_compress as hadoop_stream_compress,
8+
stream_decompress as hadoop_stream_decompress)
9+
10+
11+
FRAMING_FORMAT = 'framing'
12+
13+
HADOOP_FORMAT = 'hadoop_snappy'
14+
15+
DEFAULT_FORMAT = FRAMING_FORMAT
16+
17+
COMPRESS_METHODS = {
18+
FRAMING_FORMAT: stream_compress,
19+
HADOOP_FORMAT: hadoop_stream_compress,
20+
}
21+
22+
DECOMPRESS_METHODS = {
23+
FRAMING_FORMAT: stream_decompress,
24+
HADOOP_FORMAT: hadoop_stream_decompress,
25+
}
26+
227

328
def cmdline_main():
429
"""This method is what is run when invoking snappy via the commandline.
530
Try python -m snappy --help
631
"""
7-
import sys
8-
if (len(sys.argv) < 2 or len(sys.argv) > 4 or "--help" in sys.argv or
9-
"-h" in sys.argv or sys.argv[1] not in ("-c", "-d")):
10-
print("Usage: python -m snappy <-c/-d> [src [dst]]")
11-
print(" -c compress")
12-
print(" -d decompress")
13-
print("output is stdout if dst is omitted or '-'")
14-
print("input is stdin if src and dst are omitted or src is '-'.")
15-
sys.exit(1)
16-
17-
if len(sys.argv) >= 4 and sys.argv[3] != "-":
18-
dst = open(sys.argv[3], "wb")
19-
elif hasattr(sys.stdout, 'buffer'):
20-
dst = sys.stdout.buffer
21-
else:
22-
dst = sys.stdout
32+
stdin = sys.stdin
33+
if hasattr(sys.stdin, "buffer"):
34+
stdin = sys.stdin.buffer
35+
stdout = sys.stdout
36+
if hasattr(sys.stdout, "buffer"):
37+
stdout = sys.stdout.buffer
2338

24-
if len(sys.argv) >= 3 and sys.argv[2] != "-":
25-
src = open(sys.argv[2], "rb")
26-
elif hasattr(sys.stdin, "buffer"):
27-
src = sys.stdin.buffer
28-
else:
29-
src = sys.stdin
39+
parser = argparse.ArgumentParser(
40+
description="Compress or decompress snappy archive"
41+
)
42+
43+
group = parser.add_mutually_exclusive_group(required=True)
44+
45+
group.add_argument(
46+
'-c',
47+
dest='compress',
48+
action='store_true',
49+
help='Compress'
50+
)
51+
group.add_argument(
52+
'-d',
53+
dest='decompress',
54+
action='store_true',
55+
help='Decompress'
56+
)
3057

31-
if sys.argv[1] == "-c":
32-
method = stream_compress
58+
parser.add_argument(
59+
'-t',
60+
dest='target_format',
61+
default=DEFAULT_FORMAT,
62+
choices=[FRAMING_FORMAT, HADOOP_FORMAT],
63+
help='Target format, default is {}'.format(DEFAULT_FORMAT)
64+
)
65+
66+
parser.add_argument(
67+
'infile',
68+
nargs='?',
69+
type=argparse.FileType(mode='rb'),
70+
default=stdin,
71+
help="Input file (or stdin)"
72+
)
73+
parser.add_argument(
74+
'outfile',
75+
nargs='?',
76+
type=argparse.FileType(mode='wb'),
77+
default=stdout,
78+
help="Output file (or stdout)"
79+
)
80+
81+
args = parser.parse_args()
82+
if args.compress:
83+
method = COMPRESS_METHODS[args.target_format]
3384
else:
34-
method = stream_decompress
85+
method = DECOMPRESS_METHODS[args.target_format]
86+
87+
# workaround for https://bugs.python.org/issue14156
88+
if isinstance(args.infile, io.TextIOWrapper):
89+
args.infile = stdin
90+
if isinstance(args.outfile, io.TextIOWrapper):
91+
args.outfile = stdout
3592

36-
method(src, dst)
93+
method(args.infile, args.outfile)
3794

3895

3996
if __name__ == "__main__":

snappy/hadoop_snappy.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
"""The module implements compression/decompression with snappy using
2+
Hadoop snappy format: https://github.com/kubo/snzip#hadoop-snappy-format
3+
4+
Expected usage like:
5+
6+
import snappy
7+
8+
src = 'uncompressed'
9+
dst = 'compressed'
10+
dst2 = 'decompressed'
11+
12+
with open(src, 'rb') as fin, open(dst, 'wb') as fout:
13+
snappy.hadoop_stream_compress(src, dst)
14+
15+
with open(dst, 'rb') as fin, open(dst2, 'wb') as fout:
16+
snappy.hadoop_stream_decompress(fin, fout)
17+
18+
with open(src, 'rb') as fin1, open(dst2, 'rb') as fin2:
19+
assert fin1.read() == fin2.read()
20+
21+
"""
22+
23+
import struct
24+
25+
from .snappy import (
26+
_compress, _uncompress,
27+
stream_compress as _stream_compress,
28+
stream_decompress as _stream_decompress,
29+
UncompressError,
30+
_CHUNK_MAX)
31+
32+
33+
SNAPPY_BUFFER_SIZE_DEFAULT = 256 * 1024
34+
_STREAM_TO_STREAM_BLOCK_SIZE = _CHUNK_MAX
35+
36+
_INT_SIZE = 4
37+
38+
39+
def pack_int(num):
40+
big_endian_uint = struct.pack('>I', num)
41+
return big_endian_uint
42+
43+
44+
def unpack_int(data):
45+
return struct.unpack('>I', data)[0]
46+
47+
48+
class StreamCompressor(object):
49+
50+
"""This class implements the compressor-side of the hadoop snappy
51+
format, taken from https://github.com/kubo/snzip#hadoop-snappy-format
52+
53+
Keep in mind that this compressor object does no buffering for you to
54+
appropriately size chunks. Every call to StreamCompressor.compress results
55+
in a unique call to the underlying snappy compression method.
56+
"""
57+
58+
def __init__(self):
59+
pass
60+
61+
def add_chunk(self, data):
62+
"""Add a chunk containing 'data', returning a string that is
63+
compressed. This data should be concatenated to
64+
the tail end of an existing Snappy stream. In the absence of any
65+
internal buffering, no data is left in any internal buffers, and so
66+
unlike zlib.compress, this method returns everything.
67+
"""
68+
out = []
69+
uncompressed_length = len(data)
70+
out.append(pack_int(uncompressed_length))
71+
compressed_chunk = _compress(data)
72+
compressed_length = len(compressed_chunk)
73+
out.append(pack_int(compressed_length))
74+
out.append(compressed_chunk)
75+
return b"".join(out)
76+
77+
def compress(self, data):
78+
"""This method is simply an alias for compatibility with zlib
79+
compressobj's compress method.
80+
"""
81+
return self.add_chunk(data)
82+
83+
def flush(self, mode=None):
84+
"""This method does nothing and only exists for compatibility with
85+
the zlib compressobj
86+
"""
87+
pass
88+
89+
def copy(self):
90+
"""This method exists for compatibility with the zlib compressobj.
91+
"""
92+
return StreamCompressor()
93+
94+
95+
class StreamDecompressor(object):
96+
97+
"""This class implements the decompressor-side of the hadoop snappy
98+
format.
99+
100+
This class matches a subset of the interface found for the zlib module's
101+
decompression objects (see zlib.decompressobj). Specifically, it currently
102+
implements the decompress method without the max_length option, the flush
103+
method without the length option, and the copy method.
104+
"""
105+
106+
__slots__ = ["_buf", "_block_length", "_uncompressed_length"]
107+
108+
def __init__(self):
109+
self._buf = b""
110+
# current block length
111+
self._block_length = 0
112+
# total uncompressed data length of the current block
113+
self._uncompressed_length = 0
114+
115+
def decompress(self, data):
116+
"""Decompress 'data', returning a string containing the uncompressed
117+
data corresponding to at least part of the data in string. This data
118+
should be concatenated to the output produced by any preceding calls to
119+
the decompress() method. Some of the input data may be preserved in
120+
internal buffers for later processing.
121+
"""
122+
int_size = _INT_SIZE
123+
self._buf += data
124+
uncompressed = []
125+
while True:
126+
if len(self._buf) < int_size:
127+
return b"".join(uncompressed)
128+
next_start = 0
129+
if not self._block_length:
130+
self._block_length = unpack_int(self._buf[:int_size])
131+
self._buf = self._buf[int_size:]
132+
if len(self._buf) < int_size:
133+
return b"".join(uncompressed)
134+
compressed_length = unpack_int(
135+
self._buf[next_start:next_start + int_size]
136+
)
137+
next_start += int_size
138+
if len(self._buf) < compressed_length + next_start:
139+
return b"".join(uncompressed)
140+
chunk = self._buf[
141+
next_start:next_start + compressed_length
142+
]
143+
self._buf = self._buf[next_start + compressed_length:]
144+
uncompressed_chunk = _uncompress(chunk)
145+
self._uncompressed_length += len(uncompressed_chunk)
146+
uncompressed.append(uncompressed_chunk)
147+
if self._uncompressed_length == self._block_length:
148+
# Here we have uncompressed all subblocks of the current block
149+
self._uncompressed_length = 0
150+
self._block_length = 0
151+
continue
152+
153+
def flush(self):
154+
"""All pending input is processed, and a string containing the
155+
remaining uncompressed output is returned. After calling flush(), the
156+
decompress() method cannot be called again; the only realistic action
157+
is to delete the object.
158+
"""
159+
if self._buf != b"":
160+
raise UncompressError("chunk truncated")
161+
return b""
162+
163+
def copy(self):
164+
"""Returns a copy of the decompression object. This can be used to save
165+
the state of the decompressor midway through the data stream in order
166+
to speed up random seeks into the stream at a future point.
167+
"""
168+
copy = StreamDecompressor()
169+
copy._buf = self._buf
170+
copy._block_length = self._block_length
171+
copy._uncompressed_length = self._uncompressed_length
172+
return copy
173+
174+
175+
def stream_compress(src, dst, blocksize=SNAPPY_BUFFER_SIZE_DEFAULT):
176+
return _stream_compress(
177+
src, dst, blocksize=blocksize, compressor_cls=StreamCompressor
178+
)
179+
180+
181+
def stream_decompress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE):
182+
return _stream_decompress(
183+
src, dst, blocksize=blocksize,
184+
decompressor_cls=StreamDecompressor
185+
)

snappy/snappy.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,29 +258,35 @@ def copy(self):
258258
return copy
259259

260260

261-
def stream_compress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE):
261+
def stream_compress(src,
262+
dst,
263+
blocksize=_STREAM_TO_STREAM_BLOCK_SIZE,
264+
compressor_cls=StreamCompressor):
262265
"""Takes an incoming file-like object and an outgoing file-like object,
263266
reads data from src, compresses it, and writes it to dst. 'src' should
264267
support the read method, and 'dst' should support the write method.
265268
266269
The default blocksize is good for almost every scenario.
267270
"""
268-
compressor = StreamCompressor()
271+
compressor = compressor_cls()
269272
while True:
270273
buf = src.read(blocksize)
271274
if not buf: break
272275
buf = compressor.add_chunk(buf)
273276
if buf: dst.write(buf)
274277

275278

276-
def stream_decompress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE):
279+
def stream_decompress(src,
280+
dst,
281+
blocksize=_STREAM_TO_STREAM_BLOCK_SIZE,
282+
decompressor_cls=StreamDecompressor):
277283
"""Takes an incoming file-like object and an outgoing file-like object,
278284
reads data from src, decompresses it, and writes it to dst. 'src' should
279285
support the read method, and 'dst' should support the write method.
280286
281287
The default blocksize is good for almost every scenario.
282288
"""
283-
decompressor = StreamDecompressor()
289+
decompressor = decompressor_cls()
284290
while True:
285291
buf = src.read(blocksize)
286292
if not buf: break

0 commit comments

Comments
 (0)