diff --git a/openedx/core/lib/extract_archive.py b/openedx/core/lib/extract_archive.py index d3de59c2527f..245a1de2db6c 100644 --- a/openedx/core/lib/extract_archive.py +++ b/openedx/core/lib/extract_archive.py @@ -18,6 +18,80 @@ log = logging.getLogger(__name__) +# Default decompression budget for course archives. These thresholds block +# zip / tar bombs during course import without rejecting any realistic edX +# course export. Operators can override each threshold via Django settings: +# +# COURSE_IMPORT_MAX_EXTRACTED_SIZE (default 2 GB) +# COURSE_IMPORT_MAX_EXTRACTED_FILES (default 50 000) +# COURSE_IMPORT_MAX_COMPRESSION_RATIO (default 200x) +# +# Rationale: +# * 2 GB uncompressed covers large video-heavy OLX exports with margin. +# * 50 000 files covers heavily fragmented OLX (real exports observed +# in the 1000-5000 file range). +# * 200x compression ratio is well above any legitimate compression +# (XML text ~10x, media ~1x) and rejects canonical 42.zip-style bombs +# whose ratio is upwards of 10^6. +_DEFAULT_MAX_EXTRACTED_SIZE = 2 * 1024 * 1024 * 1024 +_DEFAULT_MAX_EXTRACTED_FILES = 50_000 +_DEFAULT_MAX_COMPRESSION_RATIO = 200 + + +def _get_archive_limits(): + """ + Resolve the three archive-size thresholds from Django settings so + tests and operators can override them without patching this module. + """ + return ( + getattr(settings, 'COURSE_IMPORT_MAX_EXTRACTED_SIZE', _DEFAULT_MAX_EXTRACTED_SIZE), + getattr(settings, 'COURSE_IMPORT_MAX_EXTRACTED_FILES', _DEFAULT_MAX_EXTRACTED_FILES), + getattr(settings, 'COURSE_IMPORT_MAX_COMPRESSION_RATIO', _DEFAULT_MAX_COMPRESSION_RATIO), + ) + + +def _check_archive_bomb(members, compressed_size): + """ + Reject archives whose declared uncompressed size, file count, or + compression ratio exceed the configured budget. Raises + ``SuspiciousOperation`` on any violation. The check runs *before* + ``archive.extractall`` writes anything to disk, so a pathological + archive never materializes a byte on the target filesystem. + """ + max_size, max_files, max_ratio = _get_archive_limits() + + if len(members) > max_files: + log.debug( + "Archive blocked: %d members exceeds limit of %d", + len(members), max_files, + ) + raise SuspiciousOperation("Archive contains too many files") + + total_uncompressed = 0 + for finfo in members: + if isinstance(finfo, ZipInfo): + member_size = finfo.file_size + elif isinstance(finfo, TarInfo): + member_size = finfo.size if finfo.isfile() else 0 + else: # defensive: safe_extractall only yields the two types above + member_size = 0 + total_uncompressed += member_size + if total_uncompressed > max_size: + log.debug( + "Archive blocked: uncompressed size %d exceeds limit of %d", + total_uncompressed, max_size, + ) + raise SuspiciousOperation("Archive too large") + + # Compression-ratio guard: use max(compressed_size, 1) to avoid a + # divide-by-zero on empty archives, which are harmless anyway. + if total_uncompressed // max(compressed_size, 1) > max_ratio: + log.debug( + "Archive blocked: compression ratio %d exceeds limit of %d", + total_uncompressed // max(compressed_size, 1), max_ratio, + ) + raise SuspiciousOperation("Archive compression ratio too high") + def resolved(rpath): """ @@ -89,10 +163,13 @@ def safe_extractall(file_name, output_path): """ Extract Zip or Tar files """ + import os # local import: stdlib, not module-hot + archive = None if not output_path.endswith("/"): output_path += "/" try: + compressed_size = os.path.getsize(file_name) if file_name.endswith(".zip"): archive = ZipFile(file_name, "r") members = archive.infolist() @@ -102,6 +179,7 @@ def safe_extractall(file_name, output_path): else: raise ValueError("Unsupported archive format") _checkmembers(members, output_path) + _check_archive_bomb(members, compressed_size) archive.extractall(output_path) finally: if archive: diff --git a/openedx/core/lib/tests/test_extract_archive.py b/openedx/core/lib/tests/test_extract_archive.py new file mode 100644 index 000000000000..da242512b2af --- /dev/null +++ b/openedx/core/lib/tests/test_extract_archive.py @@ -0,0 +1,137 @@ +""" +Regression tests for ``openedx.core.lib.extract_archive.safe_extractall`` +zip/tar bomb guards. +""" +import os +import tempfile +import zipfile + +import pytest +from django.conf import settings +from django.core.exceptions import SuspiciousOperation +from django.test import SimpleTestCase +from django.test.utils import override_settings + +from openedx.core.lib.extract_archive import _check_archive_bomb, safe_extractall + + +class CheckArchiveBombUnitTests(SimpleTestCase): + """ + Unit tests for the ``_check_archive_bomb`` helper. These work on + in-memory ``ZipInfo``/``TarInfo`` lists and do not touch the + filesystem so they remain fast and hermetic. + """ + + def _make_zip_members(self, sizes): + """ + Build a list of ``ZipInfo`` objects with the given uncompressed + sizes. ``compress_size`` is irrelevant to ``_check_archive_bomb`` + (it only inspects ``file_size`` and the on-disk archive size). + """ + members = [] + for idx, size in enumerate(sizes): + info = zipfile.ZipInfo(filename=f'member_{idx}.xml') + info.file_size = size + members.append(info) + return members + + @override_settings( + COURSE_IMPORT_MAX_EXTRACTED_SIZE=100, + COURSE_IMPORT_MAX_EXTRACTED_FILES=1000, + COURSE_IMPORT_MAX_COMPRESSION_RATIO=10, + ) + def test_rejects_oversized_archive(self): + members = self._make_zip_members([50, 60]) # sums to 110 > 100 + with pytest.raises(SuspiciousOperation, match='Archive too large'): + _check_archive_bomb(members, compressed_size=100) + + @override_settings( + COURSE_IMPORT_MAX_EXTRACTED_SIZE=10_000_000, + COURSE_IMPORT_MAX_EXTRACTED_FILES=3, + COURSE_IMPORT_MAX_COMPRESSION_RATIO=1000, + ) + def test_rejects_too_many_members(self): + members = self._make_zip_members([10, 10, 10, 10]) # 4 > 3 + with pytest.raises(SuspiciousOperation, match='too many files'): + _check_archive_bomb(members, compressed_size=10_000) + + @override_settings( + COURSE_IMPORT_MAX_EXTRACTED_SIZE=10_000_000, + COURSE_IMPORT_MAX_EXTRACTED_FILES=1000, + COURSE_IMPORT_MAX_COMPRESSION_RATIO=10, + ) + def test_rejects_high_compression_ratio(self): + # 10_000 uncompressed / 50 compressed = ratio 200, way above 10 + members = self._make_zip_members([10_000]) + with pytest.raises(SuspiciousOperation, match='compression ratio too high'): + _check_archive_bomb(members, compressed_size=50) + + @override_settings( + COURSE_IMPORT_MAX_EXTRACTED_SIZE=10_000_000, + COURSE_IMPORT_MAX_EXTRACTED_FILES=1000, + COURSE_IMPORT_MAX_COMPRESSION_RATIO=10, + ) + def test_accepts_normal_archive(self): + # 5000 uncompressed / 1000 compressed = ratio 5, well under 10 + members = self._make_zip_members([2_000, 3_000]) + _check_archive_bomb(members, compressed_size=1_000) + + def test_empty_archive_ok(self): + # An empty member list must not raise despite compressed_size=0 + _check_archive_bomb([], compressed_size=0) + + +class SafeExtractallIntegrationTests(SimpleTestCase): + """ + End-to-end tests that build a real small zip on disk and confirm + ``safe_extractall`` honors the bomb-guard budget. + """ + + def setUp(self): + super().setUp() + # ``_checkmembers`` requires the extraction target to live under + # ``settings.GITHUB_REPO_ROOT``. Create a scratch subdirectory + # there (creating the root itself if a sparse test settings file + # has not materialized it) and clean it up on teardown. + os.makedirs(settings.GITHUB_REPO_ROOT, exist_ok=True) + self._scratch = tempfile.mkdtemp( + prefix='test_extract_archive_', dir=settings.GITHUB_REPO_ROOT, + ) + self.addCleanup(lambda: os.path.isdir(self._scratch) + and __import__('shutil').rmtree(self._scratch)) + + def _make_zip(self, tmpdir, payloads): + zip_path = os.path.join(tmpdir, 'archive.zip') + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_STORED) as zf: + for name, data in payloads.items(): + zf.writestr(name, data) + return zip_path + + def test_rejects_bomb_before_writing_anything(self): + outdir = tempfile.mkdtemp(prefix='out_', dir=self._scratch) + srcdir = tempfile.mkdtemp(prefix='src_', dir=self._scratch) + # Make a file that compresses extremely well: 1 MB of the + # same byte. ``ZIP_DEFLATED`` gives a very high ratio. + zip_path = os.path.join(srcdir, 'bomb.zip') + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr('inflated.txt', b'A' * 1_000_000) + + # Lower the size cap to 10 KB so the 1 MB payload unambiguously + # exceeds it, independently of any compression-ratio check. + with override_settings( + COURSE_IMPORT_MAX_EXTRACTED_SIZE=10_000, + COURSE_IMPORT_MAX_EXTRACTED_FILES=1000, + COURSE_IMPORT_MAX_COMPRESSION_RATIO=1_000_000, + ), pytest.raises(SuspiciousOperation): + safe_extractall(zip_path, outdir) + # No files were written to the output directory + assert os.listdir(outdir) == [] + + def test_happy_path_normal_archive(self): + outdir = tempfile.mkdtemp(prefix='out_', dir=self._scratch) + srcdir = tempfile.mkdtemp(prefix='src_', dir=self._scratch) + zip_path = self._make_zip( + srcdir, {'a.xml': b'', 'b.xml': b''}, + ) + safe_extractall(zip_path, outdir) + assert sorted(os.listdir(outdir)) == ['a.xml', 'b.xml']