Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get size of topic #2163

Open
simonstumpf opened this issue Nov 12, 2020 · 12 comments
Open

Get size of topic #2163

simonstumpf opened this issue Nov 12, 2020 · 12 comments

Comments

@simonstumpf
Copy link

Hi All

Is there a way to get the size of a partition with this library?
I know its possible via commandline with the kafka-log-dirs command.

@jeffwidman
Copy link
Contributor

By size, I assume you mean bytes since you referred to a partition.

No, that's not currently available, and I don't know if it's even available via the Kafka Admin API... if there's now a KIP that added it, then we'd happily accept a PR adding support.

@cjw0202
Copy link

cjw0202 commented Sep 24, 2021

waiting for the feature

@Courouge
Copy link

Courouge commented Nov 8, 2021

Hi,
I know it is available in Java client here
By reading doc in apache kafka repo, this is possible in kafka protocol here

DescribeLogDirs API (Key: 35)

Requests:

DescribeLogDirs Request (Version: 0) => [topics] 
  topics => topic [partitions] 
    topic => STRING
    partitions => INT32

Responses:

DescribeLogDirs Response (Version: 0) => throttle_time_ms [results]
  throttle_time_ms => INT32
  results => error_code log_dir [topics] 
    error_code => INT16
    log_dir => STRING
    topics => name [partitions] 
      name => STRING
      partitions => partition_index partition_size offset_lag is_future_key 
        partition_index => INT32
        partition_size => INT64
        offset_lag => INT64
        is_future_key => BOOLEAN

I already create an issue in confluent-kafka-python here

I try some test with kafka-python lib by adding the following code

kafka/protocol/admin.py

class DescribeLogDirsResponse_v0(Response):
    API_KEY = 35
    API_VERSION = 0
    FLEXIBLE_VERSION = True
    SCHEMA = Schema(
        ('throttle_time_ms', Int32),
        ('log_dirs', Array(
            ('error_code', Int16),
            ('log_dir', String('utf-8')),
            ('topics', Array(
                ('name', String('utf-8')),
                ('partitions', Array(
                    ('partition_index', Int32),
                    ('partition_size', Int64),
                    ('offset_lag', Int64),
                    ('is_future_key', Boolean)
                ))
            ))
        ))
    )

class DescribeLogDirsRequest_v0(Request):
    API_KEY = 35
    API_VERSION = 0
    RESPONSE_TYPE = DescribeLogDirsResponse_v0
    SCHEMA = Schema(
                     ('topics', Array(
                         ('topic', String('utf-8')),
                         ('partitions', Int32)
                         ))
                 )

DescribeLogDirsResponse = [
    DescribeLogDirsResponse_v0,
]

DescribeLogDirsRequest = [
    DescribeLogDirsRequest_v0,
]

kafka/admin/client.py

    def describe_log_dirs(self, topic_name, partition_id):
        version = self._matching_api_version(DescribeLogDirsRequest)
        if version <= 1:
            request = DescribeLogDirsRequest[version]([(topic_name, partition_id)])
            future = self._send_request_to_node(self._client.least_loaded_node(), request)
            self._wait_for_futures([future])
        else:
            raise NotImplementedError(
                "Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
                    .format(version))
        return future.value

I try a simple client

from kafka import KafkaAdminClient
a = KafkaAdminClient(bootstrap_servers='0.0.0.0:9092')
topic_test = a.describe_log_dirs("test",0)
print(topic_test)

==> DescribeLogDirsResponse_v0(throttle_time_ms=0, log_dirs=[(error_code=0, log_dir='/var/lib/kafka/data', topics=[])])

I don't understand why test topic is empty ?

In kafka logdir:

du -h  /var/lib/kafka/data
12K     /var/lib/kafka/data/test-0

@Courouge
Copy link

Courouge commented Nov 21, 2021

It finally works! I did a PR #2278 @jeffwidman

Simple exemple

from kafka import KafkaAdminClient
a = KafkaAdminClient(bootstrap_servers='0.0.0.0:9092')
DescribeLogDirsResponse= a.describe_log_dirs()

DescribeLogDirsResponse

DescribeLogDirsResponse_v0(throttle_time_ms=0, log_dirs=[(error_code=0, log_dir='/var/lib/kafka/data', topics=[(name='test2', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='test3', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='test', partitions=[(partition_index=0, partition_size=2050711233, offset_lag=0, is_future_key=False)]), (name='test1', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='__confluent.support.metrics', partitions=[(partition_index=0, partition_size=3789, offset_lag=0, is_future_key=False)])])])
du -h  /var/lib/kafka/data
2.0G    /var/lib/kafka/data/test-0
24K     /var/lib/kafka/data/__confluent.support.metrics-0
8.0K    /var/lib/kafka/data/test2-0
8.0K    /var/lib/kafka/data/test3-0
8.0K    /var/lib/kafka/data/test1-0
2.0G    /var/lib/kafka/data

Assuming that "empty partitions" equal to 8.0 kB on disk but in protocol response partition_size value is 0 Byte.
For other topics that are not empty it's work fine.

@tommy04062019
Copy link

It finally works! I did a PR #2278 @jeffwidman

Simple exemple

from kafka import KafkaAdminClient
a = KafkaAdminClient(bootstrap_servers='0.0.0.0:9092')
DescribeLogDirsResponse= a.describe_log_dirs()

DescribeLogDirsResponse

DescribeLogDirsResponse_v0(throttle_time_ms=0, log_dirs=[(error_code=0, log_dir='/var/lib/kafka/data', topics=[(name='test2', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='test3', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='test', partitions=[(partition_index=0, partition_size=2050711233, offset_lag=0, is_future_key=False)]), (name='test1', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='__confluent.support.metrics', partitions=[(partition_index=0, partition_size=3789, offset_lag=0, is_future_key=False)])])])
du -h  /var/lib/kafka/data
2.0G    /var/lib/kafka/data/test-0
24K     /var/lib/kafka/data/__confluent.support.metrics-0
8.0K    /var/lib/kafka/data/test2-0
8.0K    /var/lib/kafka/data/test3-0
8.0K    /var/lib/kafka/data/test1-0
2.0G    /var/lib/kafka/data

Assuming that "empty partitions" equal to 8.0 kB on disk but in protocol response partition_size value is 0 Byte. For other topics that are not empty it's work fine.
Hi,
I try to apply your patch, it works but somehow, it doesn't show enough the numbers of topics. Ex: The cluster have 17 topics but when call describe_log_dirs it only shows info of 14 topics

@Courouge
Copy link

Hi @tommy04062019

Thank for your feed back, can you give me more info about your configuration and how can I reproduce your issue ?

@tommy04062019
Copy link

HI @Courouge Bellow is the steps I did

  1. Install kafka-python
  2. Download and copy your patch files:
RUN cp -f patch/client.py /usr/local/lib/python3.7/site-packages/kafka/admin/client.py
RUN cp -f patch/admin.py /usr/local/lib/python3.7/site-packages/kafka/protocol/admin.py
  1. Call funtion
self.admin = KafkaAdminClient(bootstrap_servers=brokers,
                                          security_protocol="SASL_PLAINTEXT",
                                          sasl_mechanism="SCRAM-SHA-512",
                                          sasl_plain_username=f"{username}",
                                          sasl_plain_password=f"{password}",
                                          request_timeout_ms=20000
                                          )
self.admin.describe_log_dirs()

The output show the same as you showed above, but the total of topics in output don't match no.of topics that the cluster have.

@Courouge
Copy link

Courouge commented Mar 9, 2022

Hi @tommy04062019,
I try SCRAM user with similar config without issues.

admin = KafkaAdminClient(
  bootstrap_servers='broker1:9093',
  security_protocol="SASL_SSL",
  sasl_plain_username="scram-user",
  sasl_plain_password="scram-password",
  sasl_mechanism="SCRAM-SHA-512")

DescribeLogDirsResponse = admin.describe_log_dirs()
  • Ensure your cluster have a LISTENER that allow SCRAM user connexion.
    On broker side by adding this followed property sasl.enabled.mechanisms=SCRAM-SHA-512

  • Create user SCRAM with kafka-config command like below:

kafka-config --zookeeper zook1:2181 --alter --add-config 'SCRAM-SHA-512=[iterations=8192,password=scram-password]'  --entity-type users --entity-name scram-user
  • Add SCRAM user in your broker & zookeeper jaas config.

@vgvineet4
Copy link

Is there a forked released version with the changes done by @Courouge which I can include in my requirements.txt?

@lariskovski
Copy link

I've tested it, it works perfectly! Thank you, Courouge!!!

@hilia
Copy link

hilia commented Nov 9, 2022

Thanks a lot @Courouge!

I did a little change because if you have multiple Kafka nodes, you may not see all the partitions because they are not hosted on all brokers.
For example from a given topic you could have:

  • server1 => partition0 + partition1
  • server2 => partition1 + partition2
  • server3 => partition2 + partition3
  • ...

So I just add the possiblity to specifiy the broker id instead of using _self.client.least_loaded_node():

def describe_log_dirs(self, broker_id=None):
    """Send a DescribeLogDirsRequest request to a broker.
    :return: A message future
    """
    version = self._matching_api_version(DescribeLogDirsRequest)
    if version <= 1:
        request = DescribeLogDirsRequest[version]()
        future = self._send_request_to_node(broker_id, request)
        self._wait_for_futures([future])
    else:
        raise NotImplementedError(
            "Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
                .format(version))
    return future.value

I asked myself if it could be better to do a loop on all broker by default if I don't set a specific broker.

And to use this function I do:

    admin_client = KafkaAdminClient(
        bootstrap_servers   = config_file_value['bootstrap_servers'],
        security_protocol   = config_file_value['security_protocol'],
        sasl_plain_username = secret_file_value['sasl_plain_username'],
        sasl_plain_password = secret_file_value['sasl_plain_password'],
        sasl_mechanism      = config_file_value['sasl_mechanism']
        )
    kafka_client = KafkaClient(
        bootstrap_servers   = config_file_value['bootstrap_servers'],
        security_protocol   = config_file_value['security_protocol'],
        sasl_plain_username = secret_file_value['sasl_plain_username'],
        sasl_plain_password = secret_file_value['sasl_plain_password'],
        sasl_mechanism      = config_file_value['sasl_mechanism']
        )

    partition_list = []
    for broker in kafka_client.cluster.brokers():

        # get all data: topic name + partition + partition size
        describe_log_dirs_response = admin_client.describe_log_dirs(broker.nodeId)

Here is the full code in order to get a list with topic_name, partition_id, size :

    admin_client = KafkaAdminClient(
        bootstrap_servers   = config_file_value['bootstrap_servers'],
        security_protocol   = config_file_value['security_protocol'],
        sasl_plain_username = secret_file_value['sasl_plain_username'],
        sasl_plain_password = secret_file_value['sasl_plain_password'],
        sasl_mechanism      = config_file_value['sasl_mechanism']
        )

    kafka_client = KafkaClient(
        bootstrap_servers   = config_file_value['bootstrap_servers'],
        security_protocol   = config_file_value['security_protocol'],
        sasl_plain_username = secret_file_value['sasl_plain_username'],
        sasl_plain_password = secret_file_value['sasl_plain_password'],
        sasl_mechanism      = config_file_value['sasl_mechanism']
        )

    # detail from topic as retention.ms, segment.ms, etc.
    #broker = admin_client.describe_configs(config_resources=[ConfigResource(ConfigResourceType.TOPIC, "my_topic_name_here")])
    #config_list = broker[0].resources[0][1]

    partition_list = []
    for broker in kafka_client.cluster.brokers():

        # get all data: topic name + partition + partition size
        describe_log_dirs_response = admin_client.describe_log_dirs(broker.nodeId)

        # number of topic per BROKER    
        #print(len(describe_log_dirs_response.log_dirs[0][2]))

        # list topics from describe_log
        for count_topic_log in range(0, len(describe_log_dirs_response.log_dirs[0][2])):

            topic_log_name = describe_log_dirs_response.log_dirs[0][2][count_topic_log][0]

            partition_log_count = len(describe_log_dirs_response.log_dirs[0][2][count_topic_log][1])

            total_topic_octet_size = 0
            for partition_log in range(0, partition_log_count):

                partition_log_id    = describe_log_dirs_response.log_dirs[0][2][count_topic_log][1][partition_log][0]
                partition_log_size  = describe_log_dirs_response.log_dirs[0][2][count_topic_log][1][partition_log][1]

                full_topic_partition = topic_log_name + '-' + str(partition_log_id)

                # skip if the record previously exists (each topic-partition is on multiple brokers)
                if any(full_topic_partition in sublist for sublist in partition_list):
                    continue
                else:
                    # add to a list with (topic-partition, size)
                    partition_list.append([full_topic_partition, partition_log_size])

Now you'll have a list with this pattern:

[
['topic_name-partition_id', size],
[...],
]

And in a real world:

[
['my_fun_topic_name-0', 25040],
['my_fun_topic_name-1', 24590],
['my_fun_topic_name-2', 26799],
['my_fun_topic_name-3', 22245],
[...],
]

Now you could use the list:

        for each_partition in partition_list:

            # split topic and partition from list
            from_kafka_split_topic_partition   = each_partition[0].rsplit('-', 1)
            from_kafka_topic_name              = from_kafka_split_topic_partition[0]
            from_kafka_partition_id            = from_kafka_split_topic_partition[1]

Hope it helps :)

@thom-vend
Copy link

thom-vend commented Jan 4, 2024

@hilia yes you right, the proposed PR doesn't give you all partitions in a multiple broker cluster.
I found that kafka_client.cluster.brokers() isn't returning all brokers, just the one I'm connecting to
admin_client.describe_cluster() Will give me the full list of brokers.

I'll update this comment if I've a better solution to offer

Edit: I ended-up using it like that for now: (In a monitoring script, very close to Hilia's suggestion 👍 )
https://gist.github.com/thom-vend/1563b53bf9f9af2ebe01a55d7551cbde

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants