diff --git a/filer/management/commands/filer_check.py b/filer/management/commands/filer_check.py index efe39027a..bdeeec50b 100644 --- a/filer/management/commands/filer_check.py +++ b/filer/management/commands/filer_check.py @@ -1,19 +1,17 @@ import os -from django.core.files.storage import DefaultStorage from django.core.management.base import BaseCommand from django.utils.module_loading import import_string -from PIL import UnidentifiedImageError - from filer import settings as filer_settings +from filer.models.filemodels import File from filer.utils.loader import load_model +from PIL import UnidentifiedImageError + class Command(BaseCommand): - help = "Look for orphaned files in media folders." - storage = DefaultStorage() - prefix = filer_settings.FILER_STORAGES['public']['main']['UPLOAD_TO_PREFIX'] + help = "Check for orphaned files, missing file references, and set image dimensions." def add_arguments(self, parser): parser.add_argument( @@ -21,35 +19,35 @@ def add_arguments(self, parser): action='store_true', dest='orphans', default=False, - help="Walk through the media folders and look for orphaned files.", + help="Scan media folders for orphaned files.", ) parser.add_argument( '--delete-orphans', action='store_true', dest='delete_orphans', default=False, - help="Delete orphaned files from their media folders.", + help="Delete orphaned files from storage.", ) parser.add_argument( '--missing', action='store_true', dest='missing', default=False, - help="Verify media folders and report about missing files.", + help="Check file references and report missing files.", ) parser.add_argument( '--delete-missing', action='store_true', dest='delete_missing', default=False, - help="Delete references in database if files are missing in media folder.", + help="Delete database entries if files are missing in the media folder.", ) parser.add_argument( '--image-dimensions', action='store_true', dest='image_dimensions', default=False, - help="Look for images without dimensions set, set them accordingly.", + help="Set image dimensions if they are not set.", ) parser.add_argument( '--noinput', @@ -57,7 +55,7 @@ def add_arguments(self, parser): action='store_false', dest='interactive', default=True, - help="Do NOT prompt the user for input of any kind." + help="Do not prompt the user for any interactive input.", ) def handle(self, *args, **options): @@ -65,99 +63,119 @@ def handle(self, *args, **options): self.verify_references(options) if options['delete_missing']: if options['interactive']: - msg = "\nThis will delete entries from your database. Are you sure you want to do this?\n\n" \ - "Type 'yes' to continue, or 'no' to cancel: " - if input(msg) != 'yes': - self.stdout.write("Aborted: Delete missing file entries from database.") + if input( + "\nThis will delete missing file references from the database.\n" + "Type 'yes' to continue, or 'no' to cancel: " + ) != 'yes': + self.stdout.write("Aborted: Missing file references were not deleted.\n") + self.stdout.flush() return self.verify_references(options) - if options['orphans']: - self.verify_storages(options) - if options['delete_orphans']: - if options['interactive']: - msg = "\nThis will delete orphaned files from your storage. Are you sure you want to do this?\n\n" \ - "Type 'yes' to continue, or 'no' to cancel: " - if input(msg) != 'yes': - self.stdout.write("Aborted: Delete orphaned files from storage.") + if options['orphans'] or options['delete_orphans']: + if options['delete_orphans'] and options['interactive']: + if input( + "\nThis will delete orphaned files from storage.\n" + "Type 'yes' to continue, or 'no' to cancel: " + ) != 'yes': + self.stdout.write("Aborted: Orphaned files were not deleted.\n") + self.stdout.flush() return self.verify_storages(options) + if options['image_dimensions']: self.image_dimensions(options) def verify_references(self, options): - from filer.models.filemodels import File - + """ + Checks that every file reference in the database exists in storage. + If a file is missing, either report it or delete the reference based on the provided options. + """ for file in File.objects.all(): if not file.file.storage.exists(file.file.name): if options['delete_missing']: file.delete() - msg = "Delete missing file reference '{}/{}' from database." + verbose_msg = f"Deleted missing file reference '{file.folder}/{file}' from the database." else: - msg = "Referenced file '{}/{}' is missing in media folder." - if options['verbosity'] > 2: - self.stdout.write(msg.format(str(file.folder), str(file))) - elif options['verbosity']: - self.stdout.write(os.path.join(str(file.folder), str(file))) + verbose_msg = f"File reference '{file.folder}/{file}' is missing in storage." + if options.get('verbosity', 1) > 2: + self.stdout.write(verbose_msg + "\n") + self.stdout.flush() + elif options.get('verbosity'): + self.stdout.write(os.path.join(str(file.folder), str(file)) + "\n") + self.stdout.flush() def verify_storages(self, options): - from filer.models.filemodels import File - - def walk(prefix): + """ + Scans all storages defined in FILER_STORAGES (e.g., public and private) + for orphaned files, then reports or deletes them based on the options. + """ + + def walk(storage, prefix, label_prefix): + # If the directory does not exist, there is nothing to scan + if not storage.exists(prefix): + return child_dirs, files = storage.listdir(prefix) for filename in files: - relfilename = os.path.join(prefix, filename) - if not File.objects.filter(file=relfilename).exists(): + actual_path = os.path.join(prefix, filename) + relfilename = os.path.join(label_prefix, filename) + if not File.objects.filter(file=actual_path).exists(): if options['delete_orphans']: - storage.delete(relfilename) - msg = "Deleted orphaned file '{}'" + storage.delete(actual_path) + message = f"Deleted orphaned file '{relfilename}'" else: - msg = "Found orphaned file '{}'" - if options['verbosity'] > 2: - self.stdout.write(msg.format(relfilename)) - elif options['verbosity']: - self.stdout.write(relfilename) - + message = f"Found orphaned file '{relfilename}'" + if options.get('verbosity', 1) > 2: + self.stdout.write(message + "\n") + self.stdout.flush() + elif options.get('verbosity'): + self.stdout.write(relfilename + "\n") + self.stdout.flush() for child in child_dirs: - walk(os.path.join(prefix, child)) - - filer_public = filer_settings.FILER_STORAGES['public']['main'] - storage = import_string(filer_public['ENGINE'])() - walk(filer_public['UPLOAD_TO_PREFIX']) + walk(storage, os.path.join(prefix, child), os.path.join(label_prefix, child)) + + # Loop through each storage configuration (e.g., public, private, etc.) + for storage_name, storage_config in filer_settings.FILER_STORAGES.items(): + storage_settings = storage_config.get('main') + if not storage_settings: + continue + storage = import_string(storage_settings['ENGINE'])() + if storage_settings.get('OPTIONS', {}).get('location'): + storage.location = storage_settings['OPTIONS']['location'] + # Set label_prefix: for public and private storages, use their names. + label_prefix = storage_name if storage_name in ['public', 'private'] else storage_settings.get('UPLOAD_TO_PREFIX', '') + walk(storage, storage_settings.get('UPLOAD_TO_PREFIX', ''), label_prefix) def image_dimensions(self, options): + """ + For images without set dimensions (_width == 0 or None), try to read their dimensions + and save them, handling SVG files and possible image errors. + """ from django.db.models import Q - import easy_thumbnails from easy_thumbnails.VIL import Image as VILImage - from filer.utils.compatibility import PILImage - no_dimensions = load_model(filer_settings.FILER_IMAGE_MODEL).objects.filter( - Q(_width=0) | Q(_width__isnull=True) - ) - self.stdout.write(f"trying to set dimensions on {no_dimensions.count()} files") - for image in no_dimensions: - if image.file_ptr: - file_holder = image.file_ptr - else: - file_holder = image + ImageModel = load_model(filer_settings.FILER_IMAGE_MODEL) + images_without_dimensions = ImageModel.objects.filter(Q(_width=0) | Q(_width__isnull=True)) + self.stdout.write(f"Setting dimensions for {images_without_dimensions.count()} images" + "\n") + self.stdout.flush() + for image in images_without_dimensions: + file_holder = image.file_ptr if getattr(image, 'file_ptr', None) else image try: imgfile = file_holder.file imgfile.seek(0) - except (FileNotFoundError): - pass + except FileNotFoundError: + continue + if image.file.name.lower().endswith('.svg'): + # For SVG files, use VILImage (invalid SVGs do not throw errors) + with VILImage.load(imgfile) as vil_image: + image._width, image._height = vil_image.size else: - if image.file.name.lower().endswith('.svg'): - with VILImage.load(imgfile) as vil_image: - # invalid svg doesnt throw errors - image._width, image._height = vil_image.size - else: - try: - with PILImage.open(imgfile) as pil_image: - image._width, image._height = pil_image.size - image._transparent = easy_thumbnails.utils.is_transparent(pil_image) - except UnidentifiedImageError: - continue - image.save() - return + try: + with PILImage.open(imgfile) as pil_image: + image._width, image._height = pil_image.size + image._transparent = easy_thumbnails.utils.is_transparent(pil_image) + except UnidentifiedImageError: + continue + image.save() diff --git a/tests/test_filer_check.py b/tests/test_filer_check.py index 98592df1d..6fe5aca31 100644 --- a/tests/test_filer_check.py +++ b/tests/test_filer_check.py @@ -1,11 +1,13 @@ import os import shutil +import tempfile from io import BytesIO, StringIO from django.core.files.uploadedfile import SimpleUploadedFile from django.core.management import call_command from django.test import TestCase from django.utils.module_loading import import_string +from django.core.files.base import ContentFile from filer import settings as filer_settings from filer.models.filemodels import File @@ -13,12 +15,10 @@ from filer.utils.loader import load_model from tests.helpers import create_image - Image = load_model(FILER_IMAGE_MODEL) class FilerCheckTestCase(TestCase): - svg_file_string = """ @@ -29,20 +29,28 @@ class FilerCheckTestCase(TestCase): """ def setUp(self): - # ensure that filer_public directory is empty from previous tests - storage = import_string(filer_settings.FILER_STORAGES['public']['main']['ENGINE'])() - upload_to_prefix = filer_settings.FILER_STORAGES['public']['main']['UPLOAD_TO_PREFIX'] - if storage.exists(upload_to_prefix): - shutil.rmtree(storage.path(upload_to_prefix)) + # Clear all configured storages to ensure a clean state for each test. + # This prevents interference from files left in any storage. + for storage_alias, storage_configs in filer_settings.FILER_STORAGES.items(): + config = storage_configs.get('main') + if not config: + continue + storage = import_string(config['ENGINE'])() + upload_prefix = config.get('UPLOAD_TO_PREFIX', '') + if storage.exists(upload_prefix): + shutil.rmtree(storage.path(upload_prefix)) + # Create a sample file for testing in the public storage. original_filename = 'testimage.jpg' file_obj = SimpleUploadedFile( name=original_filename, content=create_image().tobytes(), - content_type='image/jpeg') + content_type='image/jpeg' + ) self.filer_file = File.objects.create( file=file_obj, - original_filename=original_filename) + original_filename=original_filename + ) def tearDown(self): self.filer_file.delete() @@ -54,8 +62,10 @@ def test_delete_missing(self): call_command('filer_check', stdout=out, missing=True) self.assertEqual('', out.getvalue()) + # Remove the file to simulate a missing file. os.remove(self.filer_file.file.path) call_command('filer_check', stdout=out, missing=True) + # When verbosity is low, a simple relative file path is output. self.assertEqual("None/testimage.jpg\n", out.getvalue()) self.assertIsInstance(File.objects.get(id=file_pk), File) @@ -63,37 +73,148 @@ def test_delete_missing(self): with self.assertRaises(File.DoesNotExist): File.objects.get(id=file_pk) - def test_delete_orphans(self): + def test_delete_orphans_public(self): + # First check - should be empty initially out = StringIO() - self.assertTrue(os.path.exists(self.filer_file.file.path)) - call_command('filer_check', stdout=out, orphans=True) - # folder must be clean, free of orphans + call_command('filer_check', stdout=out, orphans=True, verbosity=1) self.assertEqual('', out.getvalue()) - # add an orphan file to our storage - storage = import_string(filer_settings.FILER_STORAGES['public']['main']['ENGINE'])() - filer_public = storage.path(filer_settings.FILER_STORAGES['public']['main']['UPLOAD_TO_PREFIX']) - orphan_file = os.path.join(filer_public, 'hello.txt') + # Add an orphan file using the storage API directly + public_settings = filer_settings.FILER_STORAGES['public']['main'] + storage = import_string(public_settings['ENGINE'])() + + # Configure storage location if specified in settings + if public_settings.get('OPTIONS', {}).get('location'): + storage.location = public_settings['OPTIONS']['location'] + + # Get upload prefix and create file path + prefix = public_settings.get('UPLOAD_TO_PREFIX', '') + file_path = 'hello.txt' + rel_path = os.path.join(prefix, file_path) if prefix else file_path + + # Save file through storage API + storage.save(rel_path, ContentFile(b"I don't belong here!")) + self.assertTrue(storage.exists(rel_path)) + + # Check if orphan is detected + out = StringIO() + call_command('filer_check', stdout=out, orphans=True, verbosity=1) + self.assertEqual("public/hello.txt\n", out.getvalue()) + + # Delete orphans + call_command('filer_check', delete_orphans=True, interactive=False, verbosity=0) + self.assertFalse(storage.exists(rel_path)) + + def test_delete_orphans_private(self): + # Skip test if private storage is not configured. + if 'private' not in filer_settings.FILER_STORAGES: + self.skipTest("Private storage not configured in FILER_STORAGES.") + + out = StringIO() + private_settings = filer_settings.FILER_STORAGES['private']['main'] + storage = import_string(private_settings['ENGINE'])() + # Set storage location if defined in OPTIONS. + if private_settings.get('OPTIONS', {}).get('location'): + storage.location = private_settings['OPTIONS']['location'] + private_path = storage.path(private_settings.get('UPLOAD_TO_PREFIX', '')) + os.makedirs(private_path, exist_ok=True) + + orphan_file = os.path.join(private_path, 'private_orphan.txt') with open(orphan_file, 'w') as fh: fh.write("I don't belong here!") + # Run the command and check that it detects the private orphan file. call_command('filer_check', stdout=out, orphans=True) - self.assertEqual("filer_public/hello.txt\n", out.getvalue()) + self.assertIn("private_orphan.txt", out.getvalue()) self.assertTrue(os.path.exists(orphan_file)) + # Delete the orphan file. call_command('filer_check', delete_orphans=True, interactive=False, verbosity=0) self.assertFalse(os.path.exists(orphan_file)) + def test_delete_orphans_multiple_storages(self): + """ + Test that the filer_check command correctly handles orphaned files in multiple storages + without permanently modifying the settings. We use monkey-patching to assign temporary + directories to the storage configurations. + """ + out = StringIO() + + # --- Monkey-patch public storage location --- + public_config = filer_settings.FILER_STORAGES['public']['main'] + temp_public_dir = tempfile.mkdtemp() + if 'OPTIONS' in public_config: + public_config['OPTIONS']['location'] = temp_public_dir + else: + public_config['OPTIONS'] = {'location': temp_public_dir} + # Determine the upload prefix (if any) and ensure the corresponding directory exists. + public_upload_prefix = public_config.get('UPLOAD_TO_PREFIX', '') + if public_upload_prefix: + public_full_dir = os.path.join(temp_public_dir, public_upload_prefix) + else: + public_full_dir = temp_public_dir + os.makedirs(public_full_dir, exist_ok=True) + + # --- Monkey-patch private storage location --- + private_config = filer_settings.FILER_STORAGES.get('private', {}).get('main') + if private_config: + temp_private_dir = tempfile.mkdtemp() + if 'OPTIONS' in private_config: + private_config['OPTIONS']['location'] = temp_private_dir + else: + private_config['OPTIONS'] = {'location': temp_private_dir} + private_upload_prefix = private_config.get('UPLOAD_TO_PREFIX', '') + if private_upload_prefix: + private_full_dir = os.path.join(temp_private_dir, private_upload_prefix) + else: + private_full_dir = temp_private_dir + os.makedirs(private_full_dir, exist_ok=True) + else: + self.skipTest("Private storage not configured in FILER_STORAGES.") + + # --- Initialize storages using the patched locations --- + from django.core.files.storage import FileSystemStorage + storage_public = FileSystemStorage(location=temp_public_dir) + storage_private = FileSystemStorage(location=private_config['OPTIONS']['location']) + + # --- Save dummy orphan files in both storages --- + # For public storage, include the upload prefix in the filename if needed. + if public_upload_prefix: + filename_public = os.path.join(public_upload_prefix, 'orphan_public.txt') + else: + filename_public = 'orphan_public.txt' + if private_config.get('UPLOAD_TO_PREFIX', ''): + filename_private = os.path.join(private_config['UPLOAD_TO_PREFIX'], 'orphan_private.txt') + else: + filename_private = 'orphan_private.txt' + + storage_public.save(filename_public, ContentFile(b"dummy content")) + storage_private.save(filename_private, ContentFile(b"dummy content")) + + # --- Run the filer_check command --- + call_command('filer_check', stdout=out, orphans=True) + output = out.getvalue() + + # Verify that the output contains indicators for both storages. + self.assertIn('public', output) + self.assertIn('private', output) + + # --- Clean up --- + storage_public.delete(filename_public) + storage_private.delete(filename_private) + shutil.rmtree(temp_public_dir) + shutil.rmtree(private_config['OPTIONS']['location']) + def test_image_dimensions_corrupted_file(self): original_filename = 'testimage.jpg' file_obj = SimpleUploadedFile( name=original_filename, - # corrupted! - content=create_image().tobytes(), - content_type='image/jpeg') + content=create_image().tobytes(), # Simulate a corrupted file. + content_type='image/jpeg' + ) self.filer_image = Image.objects.create( file=file_obj, - original_filename=original_filename) - + original_filename=original_filename + ) self.filer_image._width = 0 self.filer_image.save() call_command('filer_check', image_dimensions=True) @@ -101,12 +222,12 @@ def test_image_dimensions_corrupted_file(self): def test_image_dimensions_file_not_found(self): self.filer_image = Image.objects.create( file="123.jpg", - original_filename="123.jpg") + original_filename="123.jpg" + ) call_command('filer_check', image_dimensions=True) self.filer_image.refresh_from_db() def test_image_dimensions(self): - original_filename = 'testimage.jpg' with BytesIO() as jpg: create_image().save(jpg, format='JPEG') @@ -114,11 +235,12 @@ def test_image_dimensions(self): file_obj = SimpleUploadedFile( name=original_filename, content=jpg.read(), - content_type='image/jpeg') + content_type='image/jpeg' + ) self.filer_image = Image.objects.create( file=file_obj, - original_filename=original_filename) - + original_filename=original_filename + ) self.filer_image._width = 0 self.filer_image.save() @@ -127,33 +249,33 @@ def test_image_dimensions(self): self.assertGreater(self.filer_image._width, 0) def test_image_dimensions_invalid_svg(self): - original_filename = 'test.svg' svg_file = bytes("" + self.svg_file_string, "utf-8") file_obj = SimpleUploadedFile( name=original_filename, content=svg_file, - content_type='image/svg+xml') + content_type='image/svg+xml' + ) self.filer_image = Image.objects.create( file=file_obj, - original_filename=original_filename) - + original_filename=original_filename + ) self.filer_image._width = 0 self.filer_image.save() call_command('filer_check', image_dimensions=True) def test_image_dimensions_svg(self): - original_filename = 'test.svg' svg_file = bytes(self.svg_file_string, "utf-8") file_obj = SimpleUploadedFile( name=original_filename, content=svg_file, - content_type='image/svg+xml') + content_type='image/svg+xml' + ) self.filer_image = Image.objects.create( file=file_obj, - original_filename=original_filename) - + original_filename=original_filename + ) self.filer_image._width = 0 self.filer_image.save()