diff --git a/cms/djangoapps/contentstore/rest_api/v0/tests/test_tabs_permissions.py b/cms/djangoapps/contentstore/rest_api/v0/tests/test_tabs_permissions.py new file mode 100644 index 000000000000..528c17d4ac08 --- /dev/null +++ b/cms/djangoapps/contentstore/rest_api/v0/tests/test_tabs_permissions.py @@ -0,0 +1,92 @@ +""" +Tests verifying that v0 tabs REST API views enforce the correct authz permissions. +""" +from unittest.mock import patch +from urllib.parse import urlencode + +from django.urls import reverse + +from cms.djangoapps.contentstore.tests.utils import CourseTestCase +from openedx.core import toggles as core_toggles + + +@patch( + 'cms.djangoapps.contentstore.rest_api.v0.views.tabs.user_has_course_permission', + wraps=None, +) +@patch.object(core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, 'is_enabled', return_value=True) +class TabsV0AuthzPermissionsTest(CourseTestCase): + """ + Tests that v0 tabs API views check the correct authz permissions + when AUTHZ_COURSE_AUTHORING_FLAG is enabled. + """ + + def setUp(self): + super().setUp() + self.list_url = reverse( + 'cms.djangoapps.contentstore:v0:course_tab_list', + kwargs={'course_id': self.course.id}, + ) + self.settings_url = reverse( + 'cms.djangoapps.contentstore:v0:course_tab_settings', + kwargs={'course_id': self.course.id}, + ) + self.reorder_url = reverse( + 'cms.djangoapps.contentstore:v0:course_tab_reorder', + kwargs={'course_id': self.course.id}, + ) + + # --- CourseTabListView --- + + def test_list_checks_view_pages_and_resources(self, _mock_flag, mock_perm): + mock_perm.return_value = True + self.client.get(self.list_url) + mock_perm.assert_called_once() + self.assertEqual(mock_perm.call_args[0][1], 'courses.view_pages_and_resources') + + def test_list_denied_returns_403(self, _mock_flag, mock_perm): + mock_perm.return_value = False + resp = self.client.get(self.list_url) + self.assertEqual(resp.status_code, 403) + + # --- CourseTabSettingsView --- + + def test_settings_checks_manage_pages_and_resources(self, _mock_flag, mock_perm): + mock_perm.return_value = True + self.client.post( + f'{self.settings_url}?{urlencode({"tab_id": "wiki"})}', + data={'is_hidden': True}, + content_type='application/json', + ) + mock_perm.assert_called_once() + self.assertEqual(mock_perm.call_args[0][1], 'courses.manage_pages_and_resources') + + def test_settings_denied_returns_403(self, _mock_flag, mock_perm): + mock_perm.return_value = False + resp = self.client.post( + f'{self.settings_url}?{urlencode({"tab_id": "wiki"})}', + data={'is_hidden': True}, + content_type='application/json', + ) + self.assertEqual(resp.status_code, 403) + + # --- CourseTabReorderView --- + + def test_reorder_checks_manage_pages_and_resources(self, _mock_flag, mock_perm): + mock_perm.return_value = True + self.client.post( + self.reorder_url, + data=[], + content_type='application/json', + ) + mock_perm.assert_called_once() + self.assertEqual(mock_perm.call_args[0][1], 'courses.manage_pages_and_resources') + + def test_reorder_denied_returns_403(self, _mock_flag, mock_perm): + mock_perm.return_value = False + resp = self.client.post( + self.reorder_url, + data=[], + content_type='application/json', + ) + self.assertEqual(resp.status_code, 403) diff --git a/cms/djangoapps/contentstore/rest_api/v0/views/tabs.py b/cms/djangoapps/contentstore/rest_api/v0/views/tabs.py index 968af2246aa3..d89d5d1a7e10 100644 --- a/cms/djangoapps/contentstore/rest_api/v0/views/tabs.py +++ b/cms/djangoapps/contentstore/rest_api/v0/views/tabs.py @@ -10,7 +10,12 @@ from xmodule.modulestore.django import modulestore from xmodule.modulestore.exceptions import ItemNotFoundError -from common.djangoapps.student.auth import has_studio_read_access, has_studio_write_access +from openedx.core.djangoapps.authz.constants import LegacyAuthoringPermission +from openedx.core.djangoapps.authz.decorators import user_has_course_permission +from openedx_authz.constants.permissions import ( + COURSES_MANAGE_PAGES_AND_RESOURCES, + COURSES_VIEW_PAGES_AND_RESOURCES, +) from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin, verify_course_exists, view_auth_classes from ..serializers import CourseTabSerializer, CourseTabUpdateSerializer, TabIDLocatorSerializer from ....views.tabs import edit_tab_handler, get_course_tabs, reorder_tabs_handler @@ -78,7 +83,12 @@ def get(self, request: Request, course_id: str) -> Response: ``` """ course_key = CourseKey.from_string(course_id) - if not has_studio_read_access(request.user, course_key): + if not user_has_course_permission( + request.user, + COURSES_VIEW_PAGES_AND_RESOURCES.identifier, + course_key, + LegacyAuthoringPermission.READ, + ): self.permission_denied(request) course_block = modulestore().get_course(course_key) @@ -149,7 +159,12 @@ def post(self, request: Request, course_id: str) -> Response: without any content. """ course_key = CourseKey.from_string(course_id) - if not has_studio_write_access(request.user, course_key): + if not user_has_course_permission( + request.user, + COURSES_MANAGE_PAGES_AND_RESOURCES.identifier, + course_key, + LegacyAuthoringPermission.WRITE, + ): self.permission_denied(request) tab_id_locator = TabIDLocatorSerializer(data=request.query_params) @@ -221,7 +236,12 @@ def post(self, request: Request, course_id: str) -> Response: without any content. """ course_key = CourseKey.from_string(course_id) - if not has_studio_write_access(request.user, course_key): + if not user_has_course_permission( + request.user, + COURSES_MANAGE_PAGES_AND_RESOURCES.identifier, + course_key, + LegacyAuthoringPermission.WRITE, + ): self.permission_denied(request) course_block = modulestore().get_course(course_key) diff --git a/cms/djangoapps/contentstore/tests/test_tabs_permissions.py b/cms/djangoapps/contentstore/tests/test_tabs_permissions.py new file mode 100644 index 000000000000..81b84ed23dfc --- /dev/null +++ b/cms/djangoapps/contentstore/tests/test_tabs_permissions.py @@ -0,0 +1,100 @@ +""" +Tests verifying that tabs views enforce the correct authz permissions. +""" +from unittest.mock import patch + +from django.urls import reverse + +from cms.djangoapps.contentstore.tests.utils import CourseTestCase +from openedx.core import toggles as core_toggles + + +class TabsHandlerPermissionsTest(CourseTestCase): + """ + Tests for legacy permission enforcement on the tabs_handler view. + """ + + def setUp(self): + super().setUp() + self.url = reverse('tabs_handler', kwargs={'course_key_string': str(self.course.id)}) + self.non_staff_client, _ = self.create_non_staff_authed_user_client() + + def test_get_staff_allowed(self): + resp = self.client.get(self.url, HTTP_ACCEPT='text/html') + # GET html redirects to custom pages URL + self.assertIn(resp.status_code, [200, 301, 302]) + + def test_get_non_staff_forbidden(self): + resp = self.non_staff_client.get(self.url, HTTP_ACCEPT='text/html') + self.assertEqual(resp.status_code, 403) + + def test_put_staff_allowed(self): + resp = self.client.put( + self.url, + data={'tabs': []}, + content_type='application/json', + HTTP_ACCEPT='application/json', + ) + # Empty tabs list causes validation error, but not 403 + self.assertNotEqual(resp.status_code, 403) + + def test_put_non_staff_forbidden(self): + resp = self.non_staff_client.put( + self.url, + data={'tabs': []}, + content_type='application/json', + HTTP_ACCEPT='application/json', + ) + self.assertEqual(resp.status_code, 403) + + +@patch( + 'cms.djangoapps.contentstore.views.tabs.user_has_course_permission', + wraps=None, +) +@patch.object(core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, 'is_enabled', return_value=True) +class TabsHandlerAuthzPermissionsTest(CourseTestCase): + """ + Tests that tabs_handler checks the correct authz permissions + when AUTHZ_COURSE_AUTHORING_FLAG is enabled. + """ + + def setUp(self): + super().setUp() + self.url = reverse('tabs_handler', kwargs={'course_key_string': str(self.course.id)}) + + def test_get_checks_view_pages_and_resources(self, _mock_flag, mock_perm): + mock_perm.return_value = True + self.client.get(self.url, HTTP_ACCEPT='text/html') + mock_perm.assert_called_once() + args = mock_perm.call_args + self.assertEqual(args[0][1], 'courses.view_pages_and_resources') + + def test_put_checks_manage_pages_and_resources(self, _mock_flag, mock_perm): + mock_perm.return_value = True + self.client.put( + self.url, + data={'tabs': []}, + content_type='application/json', + HTTP_ACCEPT='application/json', + ) + mock_perm.assert_called_once() + args = mock_perm.call_args + self.assertEqual(args[0][1], 'courses.manage_pages_and_resources') + + def test_post_checks_manage_pages_and_resources(self, _mock_flag, mock_perm): + mock_perm.return_value = True + self.client.post( + self.url, + data={'tabs': []}, + content_type='application/json', + HTTP_ACCEPT='application/json', + ) + mock_perm.assert_called_once() + args = mock_perm.call_args + self.assertEqual(args[0][1], 'courses.manage_pages_and_resources') + + def test_denied_returns_403(self, _mock_flag, mock_perm): + mock_perm.return_value = False + resp = self.client.get(self.url, HTTP_ACCEPT='text/html') + self.assertEqual(resp.status_code, 403) diff --git a/cms/djangoapps/contentstore/views/tabs.py b/cms/djangoapps/contentstore/views/tabs.py index 8fa9d024458d..e2f114519a1d 100644 --- a/cms/djangoapps/contentstore/views/tabs.py +++ b/cms/djangoapps/contentstore/views/tabs.py @@ -17,8 +17,13 @@ from xmodule.modulestore.django import modulestore from xmodule.tabs import CourseTab, CourseTabList, InvalidTabsException, StaticTab -from common.djangoapps.student.auth import has_course_author_access from common.djangoapps.util.json_request import JsonResponse, JsonResponseBadRequest, expect_json +from openedx.core.djangoapps.authz.constants import LegacyAuthoringPermission +from openedx.core.djangoapps.authz.decorators import user_has_course_permission +from openedx_authz.constants.permissions import ( + COURSES_MANAGE_PAGES_AND_RESOURCES, + COURSES_VIEW_PAGES_AND_RESOURCES, +) from ..utils import get_pages_and_resources_url, get_custom_pages_url __all__ = ["tabs_handler", "update_tabs_handler"] @@ -45,7 +50,14 @@ def tabs_handler(request, course_key_string): Instead use the general xblock URL (see item.xblock_handler). """ course_key = CourseKey.from_string(course_key_string) - if not has_course_author_access(request.user, course_key): + if request.method == "GET": + authz_perm = COURSES_VIEW_PAGES_AND_RESOURCES.identifier + legacy_perm = LegacyAuthoringPermission.READ + else: + authz_perm = COURSES_MANAGE_PAGES_AND_RESOURCES.identifier + legacy_perm = LegacyAuthoringPermission.WRITE + + if not user_has_course_permission(request.user, authz_perm, course_key, legacy_perm): raise PermissionDenied() course_item = modulestore().get_course(course_key) diff --git a/openedx/core/djangoapps/course_apps/rest_api/tests/test_views_permissions.py b/openedx/core/djangoapps/course_apps/rest_api/tests/test_views_permissions.py new file mode 100644 index 000000000000..03b49de1f63e --- /dev/null +++ b/openedx/core/djangoapps/course_apps/rest_api/tests/test_views_permissions.py @@ -0,0 +1,83 @@ +""" +Tests verifying that CourseAppsView enforces the correct authz permissions. +""" +import contextlib +from unittest import mock +from unittest.mock import patch + +from django.test import Client +from django.urls import reverse +from xmodule.modulestore import ModuleStoreEnum +from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase +from xmodule.modulestore.tests.factories import CourseFactory + +from common.djangoapps.student.roles import CourseStaffRole +from common.djangoapps.student.tests.factories import UserFactory +from openedx.core import toggles as core_toggles +from openedx.core.djangolib.testing.utils import skip_unless_cms +from ...tests.utils import make_test_course_app + + +@skip_unless_cms +@patch( + 'openedx.core.djangoapps.course_apps.rest_api.v1.views.user_has_course_permission', + wraps=None, +) +@patch.object(core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, 'is_enabled', return_value=True) +class CourseAppsAuthzPermissionsTest(SharedModuleStoreTestCase): + """ + Tests that CourseAppsView checks the correct authz permissions + when AUTHZ_COURSE_AUTHORING_FLAG is enabled. + """ + + def setUp(self): + super().setUp() + self.course = CourseFactory.create(default_store=ModuleStoreEnum.Type.split) + self.user = UserFactory(password=self.TEST_PASSWORD) + CourseStaffRole(self.course.id).add_users(self.user) + self.client = Client() + self.client.login(username=self.user.username, password=self.TEST_PASSWORD) + self.url = reverse('course_apps_api:v1:course_apps', kwargs={'course_id': self.course.id}) + + @contextlib.contextmanager + def _setup_plugin_mock(self): + """Patch get_available_plugins to return a test plugin.""" + patcher = mock.patch('openedx.core.djangoapps.course_apps.plugins.PluginManager.get_available_plugins') + mock_plugins = patcher.start() + mock_plugins.return_value = { + 'app1': make_test_course_app(app_id='app1', name='App One', is_available=True), + } + yield + patcher.stop() + + def test_get_checks_view_pages_and_resources(self, _mock_flag, mock_perm): + mock_perm.return_value = True + with self._setup_plugin_mock(): + self.client.get(self.url) + mock_perm.assert_called_once() + self.assertEqual(mock_perm.call_args[0][1], 'courses.view_pages_and_resources') + + def test_get_denied_returns_403(self, _mock_flag, mock_perm): + mock_perm.return_value = False + resp = self.client.get(self.url) + self.assertEqual(resp.status_code, 403) + + def test_patch_checks_manage_pages_and_resources(self, _mock_flag, mock_perm): + mock_perm.return_value = True + with self._setup_plugin_mock(): + self.client.patch( + self.url, + data={'id': 'app1', 'enabled': True}, + content_type='application/json', + ) + mock_perm.assert_called_once() + self.assertEqual(mock_perm.call_args[0][1], 'courses.manage_pages_and_resources') + + def test_patch_denied_returns_403(self, _mock_flag, mock_perm): + mock_perm.return_value = False + resp = self.client.patch( + self.url, + data={'id': 'app1', 'enabled': True}, + content_type='application/json', + ) + self.assertEqual(resp.status_code, 403) diff --git a/openedx/core/djangoapps/course_apps/rest_api/v1/views.py b/openedx/core/djangoapps/course_apps/rest_api/v1/views.py index 511dd4d956b4..9557ede08f1a 100644 --- a/openedx/core/djangoapps/course_apps/rest_api/v1/views.py +++ b/openedx/core/djangoapps/course_apps/rest_api/v1/views.py @@ -14,10 +14,15 @@ from rest_framework.request import Request from rest_framework.response import Response -from common.djangoapps.student.auth import has_studio_write_access +from openedx.core.djangoapps.authz.constants import LegacyAuthoringPermission +from openedx.core.djangoapps.authz.decorators import user_has_course_permission from openedx.core.djangoapps.course_apps.models import CourseAppStatus from openedx.core.lib.api.authentication import BearerAuthenticationAllowInactiveUser from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin, validate_course_key, verify_course_exists +from openedx_authz.constants.permissions import ( + COURSES_MANAGE_PAGES_AND_RESOURCES, + COURSES_VIEW_PAGES_AND_RESOURCES, +) from ...api import is_course_app_enabled, set_course_app_enabled from ...plugins import CourseApp, CourseAppsPluginManager @@ -33,11 +38,21 @@ class HasStudioWriteAccess(BasePermission): def has_permission(self, request, view): """ Check if the user has write access to studio. + + Uses authz permissions when the feature flag is enabled, + falling back to legacy studio write access. + GET requests check view permission, all others check manage permission. """ user = request.user course_key_string = view.kwargs.get("course_id") course_key = validate_course_key(course_key_string) - return has_studio_write_access(user, course_key) + if request.method == 'GET': + authz_perm = COURSES_VIEW_PAGES_AND_RESOURCES.identifier + legacy_perm = LegacyAuthoringPermission.READ + else: + authz_perm = COURSES_MANAGE_PAGES_AND_RESOURCES.identifier + legacy_perm = LegacyAuthoringPermission.WRITE + return user_has_course_permission(user, authz_perm, course_key, legacy_perm) class CourseAppSerializer(serializers.Serializer): # pylint: disable=abstract-method diff --git a/openedx/core/djangoapps/discussions/permissions.py b/openedx/core/djangoapps/discussions/permissions.py index 7028defc3e00..99c9492ae638 100644 --- a/openedx/core/djangoapps/discussions/permissions.py +++ b/openedx/core/djangoapps/discussions/permissions.py @@ -6,7 +6,14 @@ from common.djangoapps.student.roles import CourseStaffRole, GlobalStaff, CourseInstructorRole from lms.djangoapps.discussion.django_comment_client.utils import has_discussion_privileges +from openedx.core.djangoapps.authz.decorators import user_has_course_permission from openedx.core.lib.api.view_utils import validate_course_key +from openedx_authz.constants.permissions import ( + COURSES_MANAGE_PAGES_AND_RESOURCES, + COURSES_VIEW_PAGES_AND_RESOURCES, +) + +from openedx.core import toggles as core_toggles DEFAULT_MESSAGE = "You're not authorized to perform this operation." PERMISSION_MESSAGES = { @@ -14,6 +21,17 @@ } +def _legacy_is_staff_or_course_team(user, course_key): + """Legacy permission check preserving original IsStaffOrCourseTeam behavior.""" + if GlobalStaff().has_user(user): + return True + return ( + CourseInstructorRole(course_key).has_user(user) + or CourseStaffRole(course_key).has_user(user) + or has_discussion_privileges(user, course_key) + ) + + class IsStaffOrCourseTeam(BasePermission): """ Check if user is global or course staff @@ -21,20 +39,23 @@ class IsStaffOrCourseTeam(BasePermission): Permission that checks to see if the user is global staff, course staff, course admin, or has discussion privileges. If none of those conditions are met, HTTP403 is returned. + + When the authz feature flag is enabled, uses authz permissions instead. + GET requests check view permission, all others check manage permission. """ def has_permission(self, request, view): course_key_string = view.kwargs.get('course_key_string') course_key = validate_course_key(course_key_string) - if GlobalStaff().has_user(request.user): - return True + if core_toggles.enable_authz_course_authoring(course_key): + if request.method == 'GET': + authz_perm = COURSES_VIEW_PAGES_AND_RESOURCES.identifier + else: + authz_perm = COURSES_MANAGE_PAGES_AND_RESOURCES.identifier + return user_has_course_permission(request.user, authz_perm, course_key) - return ( - CourseInstructorRole(course_key).has_user(request.user) or - CourseStaffRole(course_key).has_user(request.user) or - has_discussion_privileges(request.user, course_key) - ) + return _legacy_is_staff_or_course_team(request.user, course_key) def user_permissions_for_course(course, user): diff --git a/openedx/core/djangoapps/discussions/tests/test_views_permissions.py b/openedx/core/djangoapps/discussions/tests/test_views_permissions.py new file mode 100644 index 000000000000..80b76becc529 --- /dev/null +++ b/openedx/core/djangoapps/discussions/tests/test_views_permissions.py @@ -0,0 +1,104 @@ +""" +Tests verifying that discussions views enforce the correct authz permissions. +""" +from unittest.mock import patch + +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase +from xmodule.modulestore import ModuleStoreEnum +from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase +from xmodule.modulestore.tests.factories import CourseFactory + +from common.djangoapps.student.tests.factories import UserFactory +from common.djangoapps.student.roles import CourseStaffRole +from openedx.core import toggles as core_toggles + + +@patch( + 'openedx.core.djangoapps.discussions.permissions.user_has_course_permission', + wraps=None, +) +@patch.object(core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, 'is_enabled', return_value=True) +class DiscussionsAuthzPermissionsTest(ModuleStoreTestCase, APITestCase): + """ + Tests that discussions views check the correct authz permissions + when AUTHZ_COURSE_AUTHORING_FLAG is enabled. + """ + + def setUp(self): + super().setUp() + self.course = CourseFactory.create(default_store=ModuleStoreEnum.Type.split) + self.user = UserFactory(password=self.TEST_PASSWORD) + CourseStaffRole(self.course.id).add_users(self.user) + self.client.login(username=self.user.username, password=self.TEST_PASSWORD) + + self.settings_url = reverse( + 'discussions-settings', + kwargs={'course_key_string': str(self.course.id)}, + ) + self.providers_url = reverse( + 'discussions-providers', + kwargs={'course_key_string': str(self.course.id)}, + ) + + # --- DiscussionsConfigurationSettingsView --- + + def test_settings_get_checks_view(self, _mock_flag, mock_perm): + mock_perm.return_value = True + self.client.get(self.settings_url) + mock_perm.assert_called_once() + self.assertEqual(mock_perm.call_args[0][1], 'courses.view_pages_and_resources') + + def test_settings_post_checks_manage(self, _mock_flag, mock_perm): + mock_perm.return_value = True + self.client.post(self.settings_url, data={}, format='json') + mock_perm.assert_called_once() + self.assertEqual(mock_perm.call_args[0][1], 'courses.manage_pages_and_resources') + + def test_settings_denied_returns_403(self, _mock_flag, mock_perm): + mock_perm.return_value = False + resp = self.client.get(self.settings_url) + self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN) + + # --- DiscussionsProvidersView --- + + def test_providers_get_checks_view(self, _mock_flag, mock_perm): + mock_perm.return_value = True + self.client.get(self.providers_url) + mock_perm.assert_called_once() + self.assertEqual(mock_perm.call_args[0][1], 'courses.view_pages_and_resources') + + def test_providers_denied_returns_403(self, _mock_flag, mock_perm): + mock_perm.return_value = False + resp = self.client.get(self.providers_url) + self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN) + + +class DiscussionsLegacyFallbackTest(ModuleStoreTestCase, APITestCase): + """ + Tests that discussions views fall back to legacy IsStaffOrCourseTeam + behavior when the authz flag is OFF. + """ + + def setUp(self): + super().setUp() + self.course = CourseFactory.create(default_store=ModuleStoreEnum.Type.split) + self.staff_user = UserFactory(password=self.TEST_PASSWORD) + CourseStaffRole(self.course.id).add_users(self.staff_user) + self.regular_user = UserFactory(password=self.TEST_PASSWORD) + + self.settings_url = reverse( + 'discussions-settings', + kwargs={'course_key_string': str(self.course.id)}, + ) + + def test_staff_allowed_when_flag_off(self): + self.client.login(username=self.staff_user.username, password=self.TEST_PASSWORD) + resp = self.client.get(self.settings_url) + self.assertEqual(resp.status_code, status.HTTP_200_OK) + + def test_non_staff_denied_when_flag_off(self): + self.client.login(username=self.regular_user.username, password=self.TEST_PASSWORD) + resp = self.client.get(self.settings_url) + self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)