Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak using Kafka-python with PyPy #2071

Open
joein opened this issue Jun 29, 2020 · 1 comment
Open

Memory leak using Kafka-python with PyPy #2071

joein opened this issue Jun 29, 2020 · 1 comment

Comments

@joein
Copy link

joein commented Jun 29, 2020

I probably observe a memory leak using kafka-python (2.0.1) package with PyPy (Python 3.6.9 (1608da62bfc7, Dec 23 2019, 10:50:04) [PyPy 7.3.0 with GCC 7.3.1 20180303 (Red Hat 7.3.1-5)] on linux.)
I run this script on CPython 3.6 (Python 3.6.4 (default, Mar 19 2019, 21:01:45)[GCC 4.9.2] on linux) and there was not any memory problem, it stopped at 20.1 MiB and it seemed stopped on this.
I attach a script that consumes data from empty topic, and it's memory consumption is increasing for a long time (I've been testing it for at least 3 hours and all this time it was eating more and more memory).
To be accurate, it starts nearly with 100 MiB and in 1-2 hours increases to ~300-400 MiB.
I tried to profile concrete internal methods in KafkaConsumer, such as KafkaConsumer.poll, I saw memory consumption at KafkaConsumer._poll_once, and then in KafkaConsumer._client.poll and KafkaConsumer._fetcher.fetched_records, but I can't find anything useful after it.
Also I tried to test it without memory_profiler module, just observing htop - the result remained the same.
I attached three screenshots: the first is script startup, the second after one minute of work and the remaining is after 12 minutes.
As you can see, some poll calls increase memory for 0.1-0.6 MiB (0.3 MiB is the most frequent value, it can be several MiB at startup, but I think it is ok).

import time

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
from memory_profiler import profile


_bootstrap_servers = ["172.172.172.2:9092"]
_group_id = "memoryLeakGroup"
_auto_offset_reset = "earliest"
_enable_auto_commit = True
_timeout_ms_max = 5000
_batch_max_size = 10000
_assignment = [TopicPartition(topic="29_06_2020", partition=0)]
_origin_consumer = KafkaConsumer(
    bootstrap_servers=_bootstrap_servers,
    group_id=_group_id,
    auto_offset_reset=_auto_offset_reset,
    enable_auto_commit=_enable_auto_commit,
)
_origin_consumer.assign(_assignment)


@profile
def polling():
    data = _origin_consumer.poll(
        timeout_ms=_timeout_ms_max, max_records=_batch_max_size
    )

    if not data:
        print(f"There is no more data {_assignment}")
    else:
        print(f"There is some data {data}")


if __name__ == "__main__":
    while True:
        try:
            polling()
            time.sleep(0.5)
        except Exception:
            print('wow exception')

2020_06_29_14 58 26

2020_06_29_14 58 53

2020_06_29_15 10 51

I run my scripts in docker, PyPy and Kafka images can be built with the files in attachments (remove .txt extensions from boot.sh and Dockerfile)
boot.sh.txt
kafka_Dockerfile.txt
pypy_requirements.txt
pypy_Dockerfile.txt

I've also created an issue on PyPy repo

@frutik
Copy link

frutik commented Oct 13, 2021

I have similar behavior. Container dies at the peak of the memory consumption.

Знімок екрана 2021-10-13 о 17 53 41

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants