Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
matrix:
os: [ubuntu-latest]
python-version: ["3.12"]
toxenv: [quality, docs, pii_check, django42, django52]
toxenv: [quality, docs, pii_check, django52]
steps:
- uses: actions/checkout@v6
- name: setup python
Expand All @@ -35,7 +35,7 @@ jobs:
run: tox

- name: Run coverage
if: matrix.python-version == '3.12' && matrix.toxenv == 'django42'
if: matrix.python-version == '3.12' && matrix.toxenv == 'django52'
uses: codecov/codecov-action@v6
with:
token: ${{ secrets.CODECOV_TOKEN }}
Expand Down
24 changes: 24 additions & 0 deletions openedx_authz/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,30 @@ class OrgCourseOverviewGlobData(OrgGlobData):
ID_SEPARATOR: ClassVar[str] = "+"


class CCXCourseOverviewData(CourseOverviewData):
"""CCX course scope for authorization in the Open edX platform.

Inherits from CourseOverviewData as CCXs are coursees, just in a different namespace.

Attributes:
NAMESPACE: 'ccx-v1' for course scopes.
external_key: The course identifier (e.g., 'ccx-v1:OpenedX+DemoX+DemoCourse+ccx@1').
Must be a valid CourseKey format.
namespaced_key: The course identifier with namespace (e.g., 'ccx-v1^ccx-v1:OpenedX+DemoX+DemoCourse+ccx@1').
course_id: Property alias for external_key.

Examples:
>>> course = CourseOverviewData(external_key='ccx-v1:OpenedX+DemoX+DemoCourse+ccx@1')
>>> course.namespaced_key
'ccx-v1^ccx-v1:OpenedX+DemoX+DemoCourse+ccx@1'
>>> course.course_id
'ccx-v1:OpenedX+DemoX+DemoCourse+ccx@1'

"""

NAMESPACE: ClassVar[str] = "ccx-v1"


class SubjectMeta(type):
"""Metaclass for SubjectData to handle dynamic subclass instantiation based on namespace."""

Expand Down
8 changes: 8 additions & 0 deletions openedx_authz/constants/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@

COURSE_BETA_TESTER = RoleData(external_key="course_beta_tester", permissions=COURSE_BETA_TESTER_PERMISSIONS)

# This is a known LMS-only permission, but doesn't actually grant anything yet.
#
# It is intended to be handled in the Willow time frame.
CCX_COACH_PERMISSIONS = []
CCX_COACH = RoleData(external_key="ccx_coach", permissions=CCX_COACH_PERMISSIONS)


# Map of legacy course role names to their equivalent new roles
# This mapping must be unique in both directions, since it may be used as a reverse lookup (value → key).
# If multiple keys share the same value, it will lead to collisions.
Expand All @@ -189,4 +196,5 @@
"limited_staff": COURSE_LIMITED_STAFF.external_key,
"data_researcher": COURSE_DATA_RESEARCHER.external_key,
"beta_testers": COURSE_BETA_TESTER.external_key,
"ccx_coach": CCX_COACH.external_key,
}
30 changes: 21 additions & 9 deletions openedx_authz/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,22 @@ def migrate_legacy_permissions(ContentLibraryPermission):
return permissions_with_errors


def _validate_migration_input(course_id_list, org_id):
"""
Validate the common inputs for the migration functions.
"""
if not course_id_list and not org_id:
raise ValueError(
"At least one of course_id_list or org_id must be provided to limit the scope of the migration."
)

if course_id_list and any(not course_key.startswith("course-v1:") for course_key in course_id_list):
raise ValueError(
"Only full course keys (e.g., 'course-v1:org+course+run') are supported in the course_id_list."
" Other course types such as CCX are not supported."
)


def migrate_legacy_course_roles_to_authz(course_access_role_model, course_id_list, org_id, delete_after_migration):
"""
Migrate legacy course role data to the new Casbin-based authorization model.
Expand All @@ -194,10 +210,8 @@ def migrate_legacy_course_roles_to_authz(course_access_role_model, course_id_lis
param org_id: Optional organization ID to filter the migration.
param delete_after_migration: Whether to delete successfully migrated legacy permissions after migration.
"""
if not course_id_list and not org_id:
raise ValueError(
"At least one of course_id_list or org_id must be provided to limit the scope of the migration."
)
_validate_migration_input(course_id_list, org_id)

course_access_role_filter = {
"course_id__startswith": "course-v1:",
}
Expand Down Expand Up @@ -244,7 +258,8 @@ def migrate_legacy_course_roles_to_authz(course_access_role_model, course_id_lis
if not is_user_added:
logger.error(
f"Failed to migrate permission for User: {permission.user.username} "
f"to Role: {role} in Scope: {permission.course_id}"
f"to Role: {role} in Scope: {permission.course_id} "
"user may already have this permission assigned"
)
permissions_with_errors.append(permission)
continue
Expand Down Expand Up @@ -280,10 +295,7 @@ def migrate_authz_to_legacy_course_roles(
param delete_after_migration: Whether to unassign successfully migrated permissions
from the new model after migration.
"""
if not course_id_list and not org_id:
raise ValueError(
"At least one of course_id_list or org_id must be provided to limit the scope of the rollback migration."
)
_validate_migration_input(course_id_list, org_id)

# 1. Get all users with course-related permissions in the new model by filtering
# UserSubjects that are linked to CourseScopes with a valid course overview.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,25 @@ def handle(self, *args, **options):
delete_after_migration=delete_after_migration,
)

if errors:
if errors and success:
self.stdout.write(
self.style.WARNING(
f"Migration completed with {len(errors)} errors and {len(success)} roles migrated."
)
)
elif errors:
self.stdout.write(self.style.ERROR(f"Migration completed with {len(errors)} errors."))
else:
elif success:
self.stdout.write(
self.style.SUCCESS(f"Migration completed successfully with {len(success)} roles migrated.")
)
else:
self.stdout.write(
self.style.ERROR(
"No legacy roles found for the given scope, course could already be migrated, "
"or there coule be a an error in the course_id_list / org_id."
)
)

if delete_after_migration:
self.stdout.write(self.style.SUCCESS(f"{len(success)} Legacy roles deleted successfully."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,25 @@ def handle(self, *args, **options):
delete_after_migration=delete_after_migration, # control deletion here
)

if errors:
if errors and success:
self.stdout.write(
self.style.WARNING(
f"Rollback completed with {len(errors)} errors and {len(success)} roles migrated."
)
)
elif errors:
self.stdout.write(self.style.ERROR(f"Rollback completed with {len(errors)} errors."))
else:
elif success:
self.stdout.write(
self.style.SUCCESS(f"Rollback completed successfully with {len(success)} roles rolled back.")
)
else:
self.stdout.write(
self.style.ERROR(
"No roles found for the given scope, course could already be rolled back, "
"or there coule be a an error in the course_id_list / org_id."
)
)

if delete_after_migration:
self.stdout.write(
Expand Down
13 changes: 13 additions & 0 deletions openedx_authz/tests/api/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from openedx_authz.api.data import (
ActionData,
CCXCourseOverviewData,
ContentLibraryData,
CourseOverviewData,
OrgContentLibraryGlobData,
Expand Down Expand Up @@ -257,6 +258,8 @@ def test_scope_data_registration(self):
self.assertIs(ScopeData.scope_registry["lib"], ContentLibraryData)
self.assertIn("course-v1", ScopeData.scope_registry)
self.assertIs(ScopeData.scope_registry["course-v1"], CourseOverviewData)
self.assertIn("ccx-v1", ScopeData.scope_registry)
self.assertIs(ScopeData.scope_registry["ccx-v1"], CCXCourseOverviewData)

# Glob registries for organization-level scopes
self.assertIn("lib", ScopeMeta.glob_registry)
Expand All @@ -265,6 +268,7 @@ def test_scope_data_registration(self):
self.assertIs(ScopeMeta.glob_registry["course-v1"], OrgCourseOverviewGlobData)

@data(
("ccx-v1^ccx-v1:OpenedX+DemoX+DemoCourse+ccx@1", CCXCourseOverviewData),
("course-v1^course-v1:WGU+CS002+2025_T1", CourseOverviewData),
("lib^lib:DemoX:CSPROB", ContentLibraryData),
("lib^lib:DemoX*", OrgContentLibraryGlobData),
Expand All @@ -285,6 +289,7 @@ def test_dynamic_instantiation_via_namespaced_key(self, namespaced_key, expected
self.assertEqual(instance.namespaced_key, namespaced_key)

@data(
("ccx-v1^ccx-v1:OpenedX+DemoX+DemoCourse+ccx@1", CCXCourseOverviewData),
("course-v1^course-v1:WGU+CS002+2025_T1", CourseOverviewData),
("lib^lib:DemoX:CSPROB", ContentLibraryData),
("lib^lib:DemoX:*", OrgContentLibraryGlobData),
Expand All @@ -297,6 +302,8 @@ def test_get_subclass_by_namespaced_key(self, namespaced_key, expected_class):
"""Test get_subclass_by_namespaced_key returns correct subclass.

Expected Result:
- 'ccx-v1^...' returns CCXCourseOverviewData
- 'course-v1^...' returns CourseOverviewData
- 'lib^...' returns ContentLibraryData
- 'global^...' returns ScopeData
- 'unknown^...' returns ScopeData (fallback)
Expand All @@ -306,6 +313,7 @@ def test_get_subclass_by_namespaced_key(self, namespaced_key, expected_class):
self.assertIs(subclass, expected_class)

@data(
("ccx-v1:OpenedX+DemoX+DemoCourse+ccx@1", CCXCourseOverviewData),
("course-v1:WGU+CS002+2025_T1", CourseOverviewData),
("lib:DemoX:CSPROB", ContentLibraryData),
("lib:DemoX:*", OrgContentLibraryGlobData),
Expand All @@ -326,6 +334,11 @@ def test_get_subclass_by_external_key(self, external_key, expected_class):
self.assertIs(subclass, expected_class)

@data(
("ccx-v1:OpenedX+DemoX+DemoCourse+ccx@1", True, CCXCourseOverviewData),
("ccx:OpenedX+DemoX+DemoCourse+ccx@1", False, CCXCourseOverviewData),
("ccx-v2:OpenedX+DemoX+DemoCourse+ccx@1", False, CCXCourseOverviewData),
("ccx-v1-OpenedX+DemoX+DemoCourse+ccx@1", False, CCXCourseOverviewData),
("ccx-v1-OpenedX+DemoX+DemoCourse+ccx", False, CCXCourseOverviewData),
("course-v1:WGU+CS002+2025_T1", True, CourseOverviewData),
("course:WGU+CS002+2025_T1", False, CourseOverviewData),
("course-v2:WGU+CS002+2025_T1", False, CourseOverviewData),
Expand Down
113 changes: 113 additions & 0 deletions openedx_authz/tests/test_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def setUp(self):
"org": self.org,
"course_id": self.course_id,
}
self.invalid_course = f"ccx-v1:{self.org}+{OBJECT_PREFIX}+2026_01+ccx@2"
self.course_overview = CourseOverview.objects.create(
id=self.course_id, org=self.org, display_name=f"{OBJECT_PREFIX} Course"
)
Expand Down Expand Up @@ -883,6 +884,17 @@ def test_migrate_authz_to_legacy_course_roles_with_no_org_and_courses(self):
CourseAccessRole, UserSubject, course_id_list=None, org_id=None, delete_after_migration=True
)

@patch("openedx_authz.api.data.CourseOverview", CourseOverview)
def test_migrate_authz_to_legacy_course_roles_with_invalid_courses(self):
with self.assertRaises(ValueError):
migrate_authz_to_legacy_course_roles(
CourseAccessRole,
UserSubject,
course_id_list=[self.invalid_course],
org_id=None,
delete_after_migration=True,
)

@patch("openedx_authz.api.data.CourseOverview", CourseOverview)
def test_migrate_legacy_course_roles_to_authz_with_no_org_and_courses(self):
# Migrate from legacy CourseAccessRole to new Casbin-based model
Expand All @@ -891,6 +903,16 @@ def test_migrate_legacy_course_roles_to_authz_with_no_org_and_courses(self):
CourseAccessRole, course_id_list=None, org_id=None, delete_after_migration=True
)

@patch("openedx_authz.api.data.CourseOverview", CourseOverview)
def test_migrate_legacy_course_roles_to_authz_with_invalid_courses(self):
with self.assertRaises(ValueError):
migrate_legacy_course_roles_to_authz(
CourseAccessRole,
course_id_list=[self.invalid_course],
org_id=None,
delete_after_migration=True,
)

@patch("openedx_authz.management.commands.authz_migrate_course_authoring.CourseAccessRole", CourseAccessRole)
@patch("openedx_authz.management.commands.authz_migrate_course_authoring.migrate_legacy_course_roles_to_authz")
def test_authz_migrate_course_authoring_command(self, mock_migrate):
Expand Down Expand Up @@ -920,6 +942,51 @@ def test_authz_migrate_course_authoring_command(self, mock_migrate):

self.assertEqual(kwargs["delete_after_migration"], True)

@patch("openedx_authz.management.commands.authz_migrate_course_authoring.CourseAccessRole", CourseAccessRole)
@patch("openedx_authz.management.commands.authz_migrate_course_authoring.migrate_legacy_course_roles_to_authz")
def test_authz_migrate_course_authoring_command_mixed_success(self, mock_migrate):
"""
Verify that the authz_migrate_course_authoring command outputs without errors
for mixed success operations.
"""

mock_migrate.return_value = (
["course-v1:fail"],
[self.course_id],
) # Return one success and one failure

call_command("authz_migrate_course_authoring", "--course-id-list", self.course_id)
mock_migrate.assert_called_once()

# Return only one success
mock_migrate.reset_mock()
mock_migrate.return_value = (
[],
[self.course_id],
)

call_command("authz_migrate_course_authoring", "--course-id-list", self.course_id)
mock_migrate.assert_called_once()

# Return only one failure
mock_migrate.reset_mock()
mock_migrate.return_value = (
[self.course_id],
[],
)

call_command("authz_migrate_course_authoring", "--course-id-list", self.course_id)
mock_migrate.assert_called_once()

# Return only no successes or failures
mock_migrate.reset_mock()
mock_migrate.return_value = (
[],
[],
)
call_command("authz_migrate_course_authoring", "--course-id-list", self.course_id)
mock_migrate.assert_called_once()

@patch("openedx_authz.management.commands.authz_rollback_course_authoring.CourseAccessRole", CourseAccessRole)
@patch("openedx_authz.management.commands.authz_rollback_course_authoring.migrate_authz_to_legacy_course_roles")
def test_authz_rollback_course_authoring_command(self, mock_rollback):
Expand Down Expand Up @@ -950,6 +1017,52 @@ def test_authz_rollback_course_authoring_command(self, mock_rollback):

self.assertEqual(call_kwargs["delete_after_migration"], True)

@patch("openedx_authz.management.commands.authz_rollback_course_authoring.CourseAccessRole", CourseAccessRole)
@patch("openedx_authz.management.commands.authz_rollback_course_authoring.migrate_authz_to_legacy_course_roles")
def test_authz_rollback_course_authoring_command_mixed_success(self, mock_rollback):
"""
Verify that the authz_rollback_course_authoring command does not error in
mixed success operations.
"""

# Return one success and one failure
mock_rollback.return_value = (
["course-v1:fail"],
[self.course_id],
)
call_command("authz_rollback_course_authoring", "--course-id-list", self.course_id)
mock_rollback.assert_called_once()

# Return only one success
mock_rollback.reset_mock()
mock_rollback.return_value = (
[],
[self.course_id],
)

call_command("authz_rollback_course_authoring", "--course-id-list", self.course_id)
mock_rollback.assert_called_once()

# Return only one failure
mock_rollback.reset_mock()
mock_rollback.return_value = (
[self.course_id],
[],
)

call_command("authz_rollback_course_authoring", "--course-id-list", self.course_id)
mock_rollback.assert_called_once()

# Return only no successes or failures
mock_rollback.reset_mock()
mock_rollback.return_value = (
[],
[],
)

call_command("authz_rollback_course_authoring", "--course-id-list", self.course_id)
mock_rollback.assert_called_once()

@patch("openedx_authz.management.commands.authz_migrate_course_authoring.CourseAccessRole", CourseAccessRole)
@patch("openedx_authz.management.commands.authz_migrate_course_authoring.migrate_legacy_course_roles_to_authz")
def test_authz_migrate_course_authoring_command_delete_confirmation_no(self, mock_migrate):
Expand Down
Loading