Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
25c8ee9
Add python3.4 and 3.5 to tox setup
kirklg Apr 29, 2016
1e58347
New compat module to provide built-ins deprecated in py3
kirklg Apr 29, 2016
90f3fb5
Remove long/L suffix
kirklg Apr 29, 2016
d2551d7
Import compat functions and remove python3 check
kirklg Apr 29, 2016
de3da8e
Import compat functions and replace xrange with range
kirklg Apr 29, 2016
8a06ad3
Remove long compat function and replace all long types with int
kirklg Apr 29, 2016
fd7ca45
Add unicode compat function
kirklg Apr 29, 2016
86fd6c2
Replace generator method .next() with built-in next()
kirklg Apr 29, 2016
4dce9fa
Add code to ensure PriorityQueue gets imported on either python2 or 3
kirklg Apr 30, 2016
b7f5ddc
Use items() over iteritems(), py2/py3 compatible
kirklg May 2, 2016
4a87f2d
Use items() over iteritems(), py2/py3 compatible
kirklg May 2, 2016
e780277
Revert request.offset back to 2/3 compatible long
kirklg May 2, 2016
9e8b688
Add 2/3 compatbile long type in minicluster
kirklg May 2, 2016
ab19278
Make logger import absolute, and remove L/long prefix in ERROR_BYTES
kirklg May 2, 2016
85a4295
Replace f.func_name with f.__name__ as key string
kirklg May 2, 2016
528e250
Make namenode import absolut, and improve import of urlparse for 2/3 …
kirklg May 2, 2016
e6388dd
Improve import of urlparse for 2/3 compatbility
kirklg May 2, 2016
4515317
Replace generator .next() with next built-in
kirklg May 2, 2016
b00f889
Replace .has_key with "in" <dict>
kirklg May 2, 2016
0e9b04a
Make logger import absolute in rpc_sasl
kirklg May 2, 2016
7e81dfd
Import __future__ for python3 support
kirklg May 2, 2016
4874342
Make print statements in examples be py3 print functions
kirklg May 2, 2016
c65e5c8
Convert py2 print statement to py3 print function
kirklg May 2, 2016
fcb3895
Make imports absolute and add 2/3 compatible range function
kirklg May 2, 2016
ea061db
Make imports absolute and add 2/3 compatible long function
kirklg May 2, 2016
bf2603b
Make imports absolute and add 2/3 compatible range function in tail_test
kirklg May 2, 2016
d8dd7df
Make imports absolute and add 2/3 compatible <generator>-next argumen…
kirklg May 2, 2016
5b627aa
Make imports absolute, add 2/3 compatible <generator>-next arguments …
kirklg May 2, 2016
dac4aed
Make imports absolute and add 2/3 compatible <generator>-next argumen…
kirklg May 2, 2016
0f635bb
Make imports absolute and replace deprecated assertEquals with assert…
kirklg May 2, 2016
cffb0aa
Replace deprecated assertEquals with assertEqual in config_test
kirklg May 2, 2016
453e31a
Make imports absolute in copytolocal_test
kirklg May 3, 2016
dac7ea9
Absolute import and 2/3 compatible <generator>-next arguments in asse…
kirklg May 3, 2016
0fce6eb
Absolute import and 2/3 compatible <generator>-next arguments in asse…
kirklg May 3, 2016
302f148
Absolute import and 2/3 compatible <generator>-next arguments in asse…
kirklg May 3, 2016
77841c8
Absolute imports and 2/3 compatible exception handling
kirklg May 3, 2016
9652855
Absolute imports and replace generator method .next() with built-in n…
kirklg May 3, 2016
cbf63f2
Absolute imports and 2/3 compatible <generator>-next arguments in ass…
kirklg May 3, 2016
acc05a1
Absolute imports and 2/3 compatible <generator>-next arguments in ass…
kirklg May 3, 2016
518892f
Absolute imports and replace deprecated assertEquals with assertEqual…
kirklg May 3, 2016
db8c663
Absolute import in test_test
kirklg May 3, 2016
9039ee7
Absolute import in touchz_test
kirklg May 3, 2016
278de54
Import __future__ for python3 support in all tests
kirklg May 3, 2016
6200ec8
Convert py2 print statement to py3 print function in all tests
kirklg May 3, 2016
8a4662e
Insert protobuf path in sys.path, fix protobuf absolute imports
kirklg May 3, 2016
18d5ea3
Swap protobuf2 with protobuf3 in requirements.txt
kirklg May 3, 2016
659ba0a
make client_id and RPC_HEADER bytes
kirklg Jun 15, 2016
2749b46
decode protobuf node.path and make bytes
kirklg Jun 15, 2016
17522dd
Encode return data from snakebite.client to match minicluster data
kirklg Jun 15, 2016
20f5d8a
Rename bytes to _bytes in `_read_bytes`, more bytes conversions
kirklg Jun 15, 2016
47391a0
make pre-data `bytes` and change open mode to "wb" in `_handle_copyTo…
kirklg Jun 15, 2016
890f7eb
py3 style exceptions and wrap `filter` in `list` to work in py3
kirklg Jun 15, 2016
a1d613b
make stdout.write py2/py3 compatible
kirklg Jun 15, 2016
cb9f93a
make pre-data `bytes` in cat_test
kirklg Jun 15, 2016
7d2ddb4
add py2/py3 compatible `next` calls in df_test
kirklg Jun 15, 2016
abe0855
Fix tab spacing: pep8
kirklg Jun 15, 2016
3378516
absolute imports in tests
kirklg Jun 15, 2016
2c88d0d
fix typo in assertion
kirklg Jun 16, 2016
87be290
Write bytes new line
kirklg Jun 22, 2016
88afc14
replace py2 with py3 tox_env in travis.yml
kirklg Jun 22, 2016
274af13
fix python version in .travis.yml
kirklg Jun 22, 2016
0663407
.message does not exist in py3, replace with .args for py2/py3
kirklg Jun 22, 2016
16519d5
decode bytes from client to match minicluster `text', which produces …
kirklg Jun 22, 2016
f1dff52
Fix broken tests in client_test
kirklg Jun 22, 2016
6864a5d
Assign stdout alias for py2/py3
kirklg Jun 22, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
language: python
python: 2.7
python: 3.4
jdk: oraclejdk7
sudo: false
branches:
except:
- gh-pages
env:
- TOX_ENV=py26-cdh
- TOX_ENV=py27-cdh
- TOX_ENV=py26-hdp
- TOX_ENV=py27-hdp
- TOX_ENV=py34-cdh
- TOX_ENV=py34-hdp
install:
- pip install tox
script:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
argparse
protobuf>2.4.1
protobuf==3.0.0b2.post2
31 changes: 17 additions & 14 deletions snakebite/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@

'''

# python 3 support
from __future__ import absolute_import, print_function, division

# Standard library imports
import socket
import os
Expand All @@ -56,13 +59,13 @@
from snakebite.formatter import format_bytes
from snakebite.errors import RequestError, TransientException, FatalException
from snakebite.crc32c import crc
from snakebite.compat import range, py_2
from snakebite import logger

import google.protobuf.internal.encoder as encoder
import google.protobuf.internal.decoder as decoder

# Module imports

import logger
import logging
import struct
import uuid
Expand Down Expand Up @@ -131,7 +134,7 @@ def read(self, n):

def _buffer_bytes(self, n):
to_read = n
for _ in xrange(self.MAX_READ_ATTEMPTS):
for _ in range(self.MAX_READ_ATTEMPTS):
bytes_read = self.socket.recv(to_read)
self.buffer += bytes_read
to_read -= len(bytes_read)
Expand All @@ -152,7 +155,7 @@ def rewind(self, places):
log.debug("Reset buffer to pos %d" % self.pos)

def reset(self):
self.buffer = ""
self.buffer = b""
self.pos = -1 # position of last byte read

@property
Expand All @@ -162,8 +165,8 @@ def buffer_length(self):


class SocketRpcChannel(RpcChannel):
ERROR_BYTES = 18446744073709551615L
RPC_HEADER = "hrpc"
ERROR_BYTES = 18446744073709551615
RPC_HEADER = b"hrpc"
RPC_SERVICE_CLASS = 0x00
AUTH_PROTOCOL_NONE = 0x00
AUTH_PROTOCOL_SASL = 0xDF
Expand All @@ -181,7 +184,7 @@ def __init__(self, host, port, version, effective_user=None, use_sasl=False, hdf
self.sock = None
self.call_id = -3 # First time (when the connection context is sent, the call_id should be -3, otherwise start with 0 and increment)
self.version = version
self.client_id = str(uuid.uuid4())
self.client_id = str(uuid.uuid4()).encode("utf-8")
self.use_sasl = use_sasl
self.hdfs_namenode_principal = hdfs_namenode_principal
if self.use_sasl:
Expand Down Expand Up @@ -490,12 +493,12 @@ def _read_bytes(self, n, depth=0):
if depth > self.MAX_READ_ATTEMPTS:
raise TransientException("Tried to read %d more bytes, but failed after %d attempts" % (n, self.MAX_READ_ATTEMPTS))

bytes = self.sock.recv(n)
if len(bytes) < n:
left = n - len(bytes)
_bytes = self.sock.recv(n)
if len(_bytes) < n:
left = n - len(_bytes)
depth += 1
bytes += self._read_bytes(left, depth)
return bytes
_bytes += self._read_bytes(left, depth)
return _bytes

def write(self, data):
if log.getEffectiveLevel() == logging.DEBUG:
Expand Down Expand Up @@ -613,7 +616,7 @@ def readBlock(self, length, pool_id, block_id, generation_stamp, offset, block_t
# Collect checksums
if check_crc and checksum_type != self.CHECKSUM_NULL:
checksums = []
for _ in xrange(0, chunks_per_packet):
for _ in range(0, chunks_per_packet):
checksum = self._read_bytes(checksum_len)
checksum = struct.unpack("!I", checksum)[0]
checksums.append(checksum)
Expand All @@ -627,7 +630,7 @@ def readBlock(self, length, pool_id, block_id, generation_stamp, offset, block_t

read_on_packet = 0
for i in range(loads_per_packet):
load = ''
load = b''
for j in range(chunks_per_load):
log.debug("Reading chunk %s in load %s:", j, i)
bytes_to_read = min(bytes_per_chunk, data_len - read_on_packet)
Expand Down
42 changes: 24 additions & 18 deletions snakebite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# License for the specific language governing permissions and limitations under
# the License.

# python 3 support
from __future__ import absolute_import, print_function, division

import snakebite.protobuf.ClientNamenodeProtocol_pb2 as client_proto
import snakebite.glob as glob
from snakebite.platformutils import get_current_username
Expand All @@ -30,8 +33,13 @@
FatalException, TransientException)
from snakebite.namenode import Namenode
from snakebite.service import RpcService
from snakebite.compat import range, unicode, long

try:
from Queue import PriorityQueue
except ImportError:
from queue import PriorityQueue

import Queue
import zlib
import bz2
import logging
Expand All @@ -46,8 +54,6 @@
import re
import sys

if sys.version_info[0] == 3:
long = int

log = logging.getLogger(__name__)

Expand All @@ -60,7 +66,7 @@ class Client(object):
>>> from snakebite.client import Client
>>> client = Client("localhost", 8020, use_trash=False)
>>> for x in client.ls(['/']):
... print x
... print(x)

.. warning::

Expand Down Expand Up @@ -787,7 +793,7 @@ def getmerge(self, path, dst, newline=False, check_crc=False):
raise InvalidInputException("getmerge: no destination given")

temporary_target = "%s._COPYING_" % dst
f = open(temporary_target, 'w')
f = open(temporary_target, 'wb')

processor = lambda path, node, dst=dst, check_crc=check_crc: self._handle_getmerge(path, node, dst, check_crc)
try:
Expand All @@ -800,7 +806,7 @@ def getmerge(self, path, dst, newline=False, check_crc=False):
os.remove(temporary_target)
raise FatalException(load['error'])
if newline and load['response']:
f.write("\n")
f.write(b"\n")
yield {"path": dst, "response": '', "result": True, "error": load['error'], "source_path": path}

finally:
Expand Down Expand Up @@ -897,7 +903,7 @@ def tail(self, path, tail_length=1024, append=False):
yield item

def _handle_tail(self, path, node, tail_length, append):
data = ''
data = b''
for load in self._read_file(path, node, tail_only=True, check_crc=False, tail_length=tail_length):
data += load
# We read only the necessary packets but still
Expand Down Expand Up @@ -964,7 +970,7 @@ def _handle_text(self, path, node, check_crc):
if self._is_dir(node):
raise DirectoryException("text: `%s': Is a directory" % path)

text = ''
text = b''
for load in self._read_file(path, node, False, check_crc):
text += load

Expand Down Expand Up @@ -1053,7 +1059,7 @@ def _is_zero_length(self, should_check, node):

def _get_full_path(self, path, node):
if node.path:
return posixpath.join(path, node.path)
return posixpath.join(path, node.path.decode("utf-8"))
else:
return path

Expand Down Expand Up @@ -1123,7 +1129,7 @@ def _read_file(self, path, node, tail_only, check_crc, tail_length=1024):
offset_in_block = max(0, lastblock.b.numBytes - tail_length)

# Prioritize locations to read from
locations_queue = Queue.PriorityQueue() # Primitive queuing based on a node's past failure
locations_queue = PriorityQueue() # Primitive queuing based on a node's past failure
for location in block.locs:
if location.id.storageID in failed_nodes:
locations_queue.put((1, location)) # Priority num, data
Expand Down Expand Up @@ -1225,15 +1231,15 @@ def _find_items(self, paths, processor, include_toplevel=False, include_children
# Recurse into directories
if recurse and self._is_dir(node):
# Construct the full path before processing
full_path = posixpath.join(path, node.path)
full_path = posixpath.join(path, node.path.decode("utf-8"))
for item in self._find_items([full_path],
processor,
include_toplevel=False,
include_children=False,
recurse=recurse):
yield item

def _get_dir_listing(self, path, start_after=''):
def _get_dir_listing(self, path, start_after=b''):
request = client_proto.GetListingRequestProto()
request.src = path
request.startAfter = start_after
Expand Down Expand Up @@ -1352,7 +1358,7 @@ class HAClient(Client):
>>> n2 = Namenode("namenode2.mydomain", 8020)
>>> client = HAClient([n1, n2], use_trash=True)
>>> for x in client.ls(['/']):
... print x
... print(x)

.. note::
Different Hadoop distributions use different protocol versions. Snakebite defaults to 9, but this can be set by passing
Expand Down Expand Up @@ -1392,7 +1398,7 @@ def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=Fal
# is not.
raise InvalidInputException("List of namenodes is empty - couldn't create the client")
self.namenode = self._switch_namenode(namenodes)
self.namenode.next()
next(self.namenode)

def _switch_namenode(self, namenodes):
for namenode in namenodes:
Expand All @@ -1419,7 +1425,7 @@ def __handle_request_error(self, exception):
else:
# There's a valid NN in active state, but there's still request error - raise
raise
self.namenode.next()
next(self.namenode)

def __handle_socket_error(self, exception):
log.debug("Request failed with %s" % exception)
Expand All @@ -1431,7 +1437,7 @@ def __handle_socket_error(self, exception):
pass
else:
raise
self.namenode.next()
next(self.namenode)

@staticmethod
def _ha_return_method(func):
Expand All @@ -1454,7 +1460,7 @@ def wrapped(self, *args, **kw):
try:
results = func(self, *args, **kw)
while(True): # yield all results
yield results.next()
yield next(results)
except RequestError as e:
self.__handle_request_error(e)
except socket.error as e:
Expand All @@ -1477,7 +1483,7 @@ class AutoConfigClient(HAClient):
>>> from snakebite.client import AutoConfigClient
>>> client = AutoConfigClient()
>>> for x in client.ls(['/']):
... print x
... print(x)

.. note::
Different Hadoop distributions use different protocol versions. Snakebite defaults to 9, but this can be set by passing
Expand Down
Loading