Skip to content

Commit

Permalink
Merge branch 'master' into feat/fnv1a-partitioner
Browse files Browse the repository at this point in the history
  • Loading branch information
wbarnha authored Mar 19, 2024
2 parents b1f844f + a856dc4 commit 339de91
Show file tree
Hide file tree
Showing 104 changed files with 989 additions and 3,366 deletions.
3 changes: 0 additions & 3 deletions .covrc

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ jobs:
needs:
- build-sdist
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 15
strategy:
fail-fast: false
matrix:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ test-local: build-integration
cov-local: build-integration
KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) pytest \
--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
--cov-config=.covrc --cov-report html $(FLAGS) kafka test
--cov-report html $(FLAGS) kafka test
@echo "open file://`pwd`/htmlcov/index.html"

# Check the readme for syntax errors, which can lead to invalid formatting on
Expand Down
2 changes: 0 additions & 2 deletions benchmarks/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import threading
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaConsumer, KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

Expand Down
2 changes: 0 additions & 2 deletions benchmarks/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import threading
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

Expand Down
25 changes: 9 additions & 16 deletions benchmarks/varint_speed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env python
from __future__ import print_function
import pyperf
from kafka.vendor import six


test_data = [
Expand Down Expand Up @@ -67,6 +65,10 @@
BENCH_VALUES_DEC = list(map(bytearray, BENCH_VALUES_DEC))


def int2byte(i):
return bytes((i),)


def _assert_valid_enc(enc_func):
for encoded, decoded in test_data:
assert enc_func(decoded) == encoded, decoded
Expand Down Expand Up @@ -116,7 +118,7 @@ def encode_varint_1(num):
_assert_valid_enc(encode_varint_1)


def encode_varint_2(value, int2byte=six.int2byte):
def encode_varint_2(value, int2byte=int2byte):
value = (value << 1) ^ (value >> 63)

bits = value & 0x7f
Expand Down Expand Up @@ -151,7 +153,7 @@ def encode_varint_3(value, buf):
assert res == encoded


def encode_varint_4(value, int2byte=six.int2byte):
def encode_varint_4(value, int2byte=int2byte):
value = (value << 1) ^ (value >> 63)

if value <= 0x7f: # 1 byte
Expand Down Expand Up @@ -301,22 +303,13 @@ def size_of_varint_2(value):
_assert_valid_size(size_of_varint_2)


if six.PY3:
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
Raises:
IndexError: if position is out of bounds
"""
return memview[pos]
else:
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
Raises:
IndexError: if position is out of bounds
"""
return ord(memview[pos])
return memview[pos]


def decode_varint_1(buffer, pos=0):
Expand Down
2 changes: 0 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

__title__ = 'kafka'
from kafka.version import __version__
__author__ = 'Dana Powers'
Expand Down
2 changes: 0 additions & 2 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource, ConfigResourceType
from kafka.admin.client import KafkaAdminClient
from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
Expand Down
20 changes: 7 additions & 13 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
from __future__ import absolute_import
from kafka.errors import IllegalArgumentError
from enum import IntEnum

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from kafka.errors import IllegalArgumentError


class ResourceType(IntEnum):
Expand Down Expand Up @@ -69,7 +63,7 @@ class ACLResourcePatternType(IntEnum):
PREFIXED = 4


class ACLFilter(object):
class ACLFilter:
"""Represents a filter to use with describing and deleting ACLs
The difference between this class and the ACL class is mainly that
Expand Down Expand Up @@ -161,7 +155,7 @@ def __init__(
permission_type,
resource_pattern
):
super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern)
super().__init__(principal, host, operation, permission_type, resource_pattern)
self.validate()

def validate(self):
Expand All @@ -173,7 +167,7 @@ def validate(self):
raise IllegalArgumentError("resource_pattern must be a ResourcePattern object")


class ResourcePatternFilter(object):
class ResourcePatternFilter:
def __init__(
self,
resource_type,
Expand Down Expand Up @@ -232,13 +226,13 @@ def __init__(
resource_name,
pattern_type=ACLResourcePatternType.LITERAL
):
super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type)
super().__init__(resource_type, resource_name, pattern_type)
self.validate()

def validate(self):
if self.resource_type == ResourceType.ANY:
raise IllegalArgumentError("resource_type cannot be ANY")
if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]:
raise IllegalArgumentError(
"pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)
f"pattern_type cannot be {self.pattern_type.name} on a concrete ResourcePattern"
)
11 changes: 4 additions & 7 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from __future__ import absolute_import

from collections import defaultdict
import copy
import logging
import socket

from . import ConfigResourceType
from kafka.vendor import six

from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
Expand All @@ -32,7 +29,7 @@
log = logging.getLogger(__name__)


class KafkaAdminClient(object):
class KafkaAdminClient:
"""A class for administering the Kafka cluster.
Warning:
Expand Down Expand Up @@ -194,7 +191,7 @@ def __init__(self, **configs):
log.debug("Starting KafkaAdminClient with configuration: %s", configs)
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
raise KafkaConfigurationError(f"Unrecognized configs: {extra_configs}")

self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
Expand Down Expand Up @@ -874,7 +871,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
))
else:
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
f"Support for DescribeConfigs v{version} has not yet been added to KafkaAdminClient.")

self._wait_for_futures(futures)
return [f.value for f in futures]
Expand Down Expand Up @@ -1197,7 +1194,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
topics_partitions_dict = defaultdict(set)
for topic, partition in partitions:
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
topics_partitions = list(topics_partitions_dict.items())
request = OffsetFetchRequest[version](group_id, topics_partitions)
else:
raise NotImplementedError(
Expand Down
11 changes: 2 additions & 9 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from enum import IntEnum


class ConfigResourceType(IntEnum):
Expand All @@ -15,7 +8,7 @@ class ConfigResourceType(IntEnum):
TOPIC = 2


class ConfigResource(object):
class ConfigResource:
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
Expand Down
5 changes: 1 addition & 4 deletions kafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from __future__ import absolute_import


class NewPartitions(object):
class NewPartitions:
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
must be the difference between the new total number of partitions and the existing number of partitions.
Arguments:
Expand Down
4 changes: 1 addition & 3 deletions kafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import absolute_import

from kafka.errors import IllegalArgumentError


class NewTopic(object):
class NewTopic:
""" A class for new topic creation
Arguments:
name (string): name of the topic
Expand Down
35 changes: 9 additions & 26 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
from __future__ import absolute_import, division

import collections
import copy
import logging
import random
import selectors
import socket
import threading
import time
import weakref

# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor import selectors34 as selectors

from kafka.vendor import six

from kafka.cluster import ClusterMetadata
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from kafka import errors as Errors
Expand All @@ -27,19 +17,12 @@
from kafka.metrics.stats.rate import TimeUnit
from kafka.protocol.metadata import MetadataRequest
from kafka.util import Dict, WeakMethod
# Although this looks unused, it actually monkey-patches socket.socketpair()
# and should be left in as long as we're using socket.socketpair() in this file
from kafka.vendor import socketpair
from kafka.version import __version__

if six.PY2:
ConnectionError = None


log = logging.getLogger('kafka.client')


class KafkaClient(object):
class KafkaClient:
"""
A network client for asynchronous request/response network I/O.
Expand Down Expand Up @@ -374,7 +357,7 @@ def _maybe_connect(self, node_id):

if conn is None:
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)
assert broker, 'Broker id {} not in current metadata'.format(node_id)

log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
Expand Down Expand Up @@ -686,7 +669,7 @@ def _poll(self, timeout):
unexpected_data = key.fileobj.recv(1)
if unexpected_data: # anything other than a 0-byte read means protocol issues
log.warning('Protocol out of sync on %r, closing', conn)
except socket.error:
except OSError:
pass
conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
continue
Expand All @@ -701,7 +684,7 @@ def _poll(self, timeout):
if conn not in processed and conn.connected() and conn._sock.pending():
self._pending_completion.extend(conn.recv())

for conn in six.itervalues(self._conns):
for conn in self._conns.values():
if conn.requests_timed_out():
log.warning('%s timed out after %s ms. Closing connection.',
conn, conn.config['request_timeout_ms'])
Expand Down Expand Up @@ -941,7 +924,7 @@ def wakeup(self):
except socket.timeout:
log.warning('Timeout to send to wakeup socket!')
raise Errors.KafkaTimeoutError()
except socket.error as e:
except OSError as e:
log.warning('Unable to send to wakeup socket!')
if self._raise_upon_socket_err_during_wakeup:
raise e
Expand All @@ -951,7 +934,7 @@ def _clear_wake_fd(self):
while True:
try:
self._wake_r.recv(1024)
except socket.error:
except OSError:
break

def _maybe_close_oldest_connection(self):
Expand Down Expand Up @@ -981,7 +964,7 @@ def bootstrap_connected(self):
OrderedDict = dict


class IdleConnectionManager(object):
class IdleConnectionManager:
def __init__(self, connections_max_idle_ms):
if connections_max_idle_ms > 0:
self.connections_max_idle = connections_max_idle_ms / 1000
Expand Down Expand Up @@ -1043,7 +1026,7 @@ def poll_expired_connection(self):
return None


class KafkaClientMetrics(object):
class KafkaClientMetrics:
def __init__(self, metrics, metric_group_prefix, conns):
self.metrics = metrics
self.metric_group_name = metric_group_prefix + '-metrics'
Expand Down
Loading

0 comments on commit 339de91

Please sign in to comment.