Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions openedx/core/lib/extract_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,80 @@

log = logging.getLogger(__name__)

# Default decompression budget for course archives. These thresholds block
# zip / tar bombs during course import without rejecting any realistic edX
# course export. Operators can override each threshold via Django settings:
#
# COURSE_IMPORT_MAX_EXTRACTED_SIZE (default 2 GB)
# COURSE_IMPORT_MAX_EXTRACTED_FILES (default 50 000)
# COURSE_IMPORT_MAX_COMPRESSION_RATIO (default 200x)
#
# Rationale:
# * 2 GB uncompressed covers large video-heavy OLX exports with margin.
# * 50 000 files covers heavily fragmented OLX (real exports observed
# in the 1000-5000 file range).
# * 200x compression ratio is well above any legitimate compression
# (XML text ~10x, media ~1x) and rejects canonical 42.zip-style bombs
# whose ratio is upwards of 10^6.
_DEFAULT_MAX_EXTRACTED_SIZE = 2 * 1024 * 1024 * 1024
_DEFAULT_MAX_EXTRACTED_FILES = 50_000
_DEFAULT_MAX_COMPRESSION_RATIO = 200


def _get_archive_limits():
"""
Resolve the three archive-size thresholds from Django settings so
tests and operators can override them without patching this module.
"""
return (
getattr(settings, 'COURSE_IMPORT_MAX_EXTRACTED_SIZE', _DEFAULT_MAX_EXTRACTED_SIZE),
getattr(settings, 'COURSE_IMPORT_MAX_EXTRACTED_FILES', _DEFAULT_MAX_EXTRACTED_FILES),
getattr(settings, 'COURSE_IMPORT_MAX_COMPRESSION_RATIO', _DEFAULT_MAX_COMPRESSION_RATIO),
)


def _check_archive_bomb(members, compressed_size):
"""
Reject archives whose declared uncompressed size, file count, or
compression ratio exceed the configured budget. Raises
``SuspiciousOperation`` on any violation. The check runs *before*
``archive.extractall`` writes anything to disk, so a pathological
archive never materializes a byte on the target filesystem.
"""
max_size, max_files, max_ratio = _get_archive_limits()

if len(members) > max_files:
log.debug(
"Archive blocked: %d members exceeds limit of %d",
len(members), max_files,
)
raise SuspiciousOperation("Archive contains too many files")

total_uncompressed = 0
for finfo in members:
if isinstance(finfo, ZipInfo):
member_size = finfo.file_size
elif isinstance(finfo, TarInfo):
member_size = finfo.size if finfo.isfile() else 0
else: # defensive: safe_extractall only yields the two types above
member_size = 0
total_uncompressed += member_size
if total_uncompressed > max_size:
log.debug(
"Archive blocked: uncompressed size %d exceeds limit of %d",
total_uncompressed, max_size,
)
raise SuspiciousOperation("Archive too large")

# Compression-ratio guard: use max(compressed_size, 1) to avoid a
# divide-by-zero on empty archives, which are harmless anyway.
if total_uncompressed // max(compressed_size, 1) > max_ratio:
log.debug(
"Archive blocked: compression ratio %d exceeds limit of %d",
total_uncompressed // max(compressed_size, 1), max_ratio,
)
raise SuspiciousOperation("Archive compression ratio too high")


def resolved(rpath):
"""
Expand Down Expand Up @@ -89,10 +163,13 @@ def safe_extractall(file_name, output_path):
"""
Extract Zip or Tar files
"""
import os # local import: stdlib, not module-hot

archive = None
if not output_path.endswith("/"):
output_path += "/"
try:
compressed_size = os.path.getsize(file_name)
if file_name.endswith(".zip"):
archive = ZipFile(file_name, "r")
members = archive.infolist()
Expand All @@ -102,6 +179,7 @@ def safe_extractall(file_name, output_path):
else:
raise ValueError("Unsupported archive format")
_checkmembers(members, output_path)
_check_archive_bomb(members, compressed_size)
archive.extractall(output_path)
finally:
if archive:
Expand Down
137 changes: 137 additions & 0 deletions openedx/core/lib/tests/test_extract_archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""
Regression tests for ``openedx.core.lib.extract_archive.safe_extractall``
zip/tar bomb guards.
"""
import os
import tempfile
import zipfile

import pytest
from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.test import SimpleTestCase
from django.test.utils import override_settings

from openedx.core.lib.extract_archive import _check_archive_bomb, safe_extractall


class CheckArchiveBombUnitTests(SimpleTestCase):
"""
Unit tests for the ``_check_archive_bomb`` helper. These work on
in-memory ``ZipInfo``/``TarInfo`` lists and do not touch the
filesystem so they remain fast and hermetic.
"""

def _make_zip_members(self, sizes):
"""
Build a list of ``ZipInfo`` objects with the given uncompressed
sizes. ``compress_size`` is irrelevant to ``_check_archive_bomb``
(it only inspects ``file_size`` and the on-disk archive size).
"""
members = []
for idx, size in enumerate(sizes):
info = zipfile.ZipInfo(filename=f'member_{idx}.xml')
info.file_size = size
members.append(info)
return members

@override_settings(
COURSE_IMPORT_MAX_EXTRACTED_SIZE=100,
COURSE_IMPORT_MAX_EXTRACTED_FILES=1000,
COURSE_IMPORT_MAX_COMPRESSION_RATIO=10,
)
def test_rejects_oversized_archive(self):
members = self._make_zip_members([50, 60]) # sums to 110 > 100
with pytest.raises(SuspiciousOperation, match='Archive too large'):
_check_archive_bomb(members, compressed_size=100)

@override_settings(
COURSE_IMPORT_MAX_EXTRACTED_SIZE=10_000_000,
COURSE_IMPORT_MAX_EXTRACTED_FILES=3,
COURSE_IMPORT_MAX_COMPRESSION_RATIO=1000,
)
def test_rejects_too_many_members(self):
members = self._make_zip_members([10, 10, 10, 10]) # 4 > 3
with pytest.raises(SuspiciousOperation, match='too many files'):
_check_archive_bomb(members, compressed_size=10_000)

@override_settings(
COURSE_IMPORT_MAX_EXTRACTED_SIZE=10_000_000,
COURSE_IMPORT_MAX_EXTRACTED_FILES=1000,
COURSE_IMPORT_MAX_COMPRESSION_RATIO=10,
)
def test_rejects_high_compression_ratio(self):
# 10_000 uncompressed / 50 compressed = ratio 200, way above 10
members = self._make_zip_members([10_000])
with pytest.raises(SuspiciousOperation, match='compression ratio too high'):
_check_archive_bomb(members, compressed_size=50)

@override_settings(
COURSE_IMPORT_MAX_EXTRACTED_SIZE=10_000_000,
COURSE_IMPORT_MAX_EXTRACTED_FILES=1000,
COURSE_IMPORT_MAX_COMPRESSION_RATIO=10,
)
def test_accepts_normal_archive(self):
# 5000 uncompressed / 1000 compressed = ratio 5, well under 10
members = self._make_zip_members([2_000, 3_000])
_check_archive_bomb(members, compressed_size=1_000)

def test_empty_archive_ok(self):
# An empty member list must not raise despite compressed_size=0
_check_archive_bomb([], compressed_size=0)


class SafeExtractallIntegrationTests(SimpleTestCase):
"""
End-to-end tests that build a real small zip on disk and confirm
``safe_extractall`` honors the bomb-guard budget.
"""

def setUp(self):
super().setUp()
# ``_checkmembers`` requires the extraction target to live under
# ``settings.GITHUB_REPO_ROOT``. Create a scratch subdirectory
# there (creating the root itself if a sparse test settings file
# has not materialized it) and clean it up on teardown.
os.makedirs(settings.GITHUB_REPO_ROOT, exist_ok=True)
self._scratch = tempfile.mkdtemp(
prefix='test_extract_archive_', dir=settings.GITHUB_REPO_ROOT,
)
self.addCleanup(lambda: os.path.isdir(self._scratch)
and __import__('shutil').rmtree(self._scratch))

def _make_zip(self, tmpdir, payloads):
zip_path = os.path.join(tmpdir, 'archive.zip')
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_STORED) as zf:
for name, data in payloads.items():
zf.writestr(name, data)
return zip_path

def test_rejects_bomb_before_writing_anything(self):
outdir = tempfile.mkdtemp(prefix='out_', dir=self._scratch)
srcdir = tempfile.mkdtemp(prefix='src_', dir=self._scratch)
# Make a file that compresses extremely well: 1 MB of the
# same byte. ``ZIP_DEFLATED`` gives a very high ratio.
zip_path = os.path.join(srcdir, 'bomb.zip')
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr('inflated.txt', b'A' * 1_000_000)

# Lower the size cap to 10 KB so the 1 MB payload unambiguously
# exceeds it, independently of any compression-ratio check.
with override_settings(
COURSE_IMPORT_MAX_EXTRACTED_SIZE=10_000,
COURSE_IMPORT_MAX_EXTRACTED_FILES=1000,
COURSE_IMPORT_MAX_COMPRESSION_RATIO=1_000_000,
), pytest.raises(SuspiciousOperation):
safe_extractall(zip_path, outdir)
# No files were written to the output directory
assert os.listdir(outdir) == []

def test_happy_path_normal_archive(self):
outdir = tempfile.mkdtemp(prefix='out_', dir=self._scratch)
srcdir = tempfile.mkdtemp(prefix='src_', dir=self._scratch)
zip_path = self._make_zip(
srcdir, {'a.xml': b'<xml/>', 'b.xml': b'<xml/>'},
)
safe_extractall(zip_path, outdir)
assert sorted(os.listdir(outdir)) == ['a.xml', 'b.xml']