Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions lms/djangoapps/discussion/rest_api/forum_mute_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from lms.djangoapps.discussion.rest_api.permissions import (
CanMuteUsers
)
from common.djangoapps.student.roles import CourseStaffRole, CourseInstructorRole, GlobalStaff
from lms.djangoapps.discussion.rest_api.serializers import (
MuteRequestSerializer,
UnmuteRequestSerializer,
Expand Down Expand Up @@ -79,12 +78,10 @@ def _get_target_user_and_data(request_data, course_id):


def _is_privileged_user(user, course_key):
"""Check if user has privileged permissions."""
"""Check if user has discussion moderation privileges."""
return (
has_discussion_privileges(user, course_key) or
GlobalStaff().has_user(user) or
CourseStaffRole(course_key).has_user(user) or
CourseInstructorRole(course_key).has_user(user)
user.is_staff
)


Expand Down
71 changes: 10 additions & 61 deletions lms/djangoapps/discussion/rest_api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ def get_editable_fields(cc_content: Union[Thread, Comment], context: Dict) -> Se
# Flagging/un-flagging is always available.
return {"abuse_flagged"}

is_author = _is_author(cc_content, context)

# Map each field to the condition in which it's editable.
editable_fields = {
"abuse_flagged": True,
"closed": is_thread and has_moderation_privilege,
"close_reason_code": is_thread and has_moderation_privilege,
"pinned": is_thread and (has_moderation_privilege or is_staff_or_admin),
"muted": is_thread and (has_moderation_privilege or is_staff_or_admin),
"muted": is_thread and has_moderation_privilege and not is_author,
"read": is_thread,
}
if is_thread:
Expand All @@ -121,7 +123,6 @@ def get_editable_fields(cc_content: Union[Thread, Comment], context: Dict) -> Se
# Return only editable fields
return _filter_fields(editable_fields)

is_author = _is_author(cc_content, context)
editable_fields.update({
"voted": has_moderation_privilege or not is_author or is_staff_or_admin,
"raw_body": has_moderation_privilege or is_author,
Expand Down Expand Up @@ -193,13 +194,6 @@ def can_take_action_on_spam(user, course_id):
"""
Returns if the user has access to take action against forum spam posts.

Grants access to:
- Global Staff (user.is_staff or GlobalStaff role)
- Course Staff for the specific course
- Course Instructors for the specific course
- Forum Moderators for the specific course
- Forum Administrators for the specific course

Parameters:
user: User object
course_id: CourseKey or string of course_id
Expand All @@ -214,14 +208,6 @@ def can_take_action_on_spam(user, course_id):
if isinstance(course_id, str):
course_id = CourseKey.from_string(course_id)

# Check if user is Course Staff or Instructor for this specific course
if CourseStaffRole(course_id).has_user(user):
return True

if CourseInstructorRole(course_id).has_user(user):
return True

# Check forum moderator/administrator roles for this specific course
user_roles = set(
Role.objects.filter(
users=user,
Expand All @@ -236,18 +222,6 @@ def can_take_action_on_spam(user, course_id):
class IsAllowedToBulkDelete(permissions.BasePermission):
"""
Permission that checks if the user is allowed to perform bulk delete and ban operations.

Grants access to:
- Global Staff (superusers)
- Course Staff
- Course Instructors
- Forum Moderators
- Forum Administrators

Denies access to:
- Unauthenticated users
- Regular students
- Community TAs (they can moderate individual posts but not bulk delete)
"""

def has_permission(self, request, view):
Expand Down Expand Up @@ -279,45 +253,39 @@ def has_permission(self, request, view):
class CanMuteUsers(permissions.BasePermission):
"""
Permission class for all mute/unmute operations.
Handles muting, unmuting, and basic course access permissions.
"""

@staticmethod
def _is_privileged_user(user, course_id):
"""
Check if user has discussion privileges.
Check if user has discussion moderation privileges.
"""
return (
has_discussion_privileges(user, course_id) or
GlobalStaff().has_user(user) or
CourseStaffRole(course_id).has_user(user) or
CourseInstructorRole(course_id).has_user(user)
GlobalStaff().has_user(user)
)

def has_permission(self, request, view):
"""Check basic mute permissions - same logic as IsStaffOrCourseTeamOrEnrolled"""
"""
Check if user has permission to access mute/unmute endpoints.
"""
if not request.user.is_authenticated:
return False

# Get course_id from URL kwargs first (where it's actually passed)
course_id = view.kwargs.get("course_id")
if not course_id:
return False

# Convert course_id to CourseKey if it's a string
if isinstance(course_id, str):
try:
course_id = CourseKey.from_string(course_id)
except InvalidKeyError:
return False

# Use same permission logic as IsStaffOrCourseTeamOrEnrolled
return (
GlobalStaff().has_user(request.user) or
CourseStaffRole(course_id).has_user(request.user) or
CourseInstructorRole(course_id).has_user(request.user) or
CourseEnrollment.is_enrolled(request.user, course_id) or
has_discussion_privileges(request.user, course_id)
has_discussion_privileges(request.user, course_id) or
CourseEnrollment.is_enrolled(request.user, course_id)
)

@staticmethod
Expand Down Expand Up @@ -415,19 +383,6 @@ def can_unmute(requesting_user, target_user, course_id, scope='personal'):
class IsAllowedToRestore(permissions.BasePermission):
"""
Permission that checks if the user has privileges to restore individual deleted content.

This permission is intentionally more permissive than IsAllowedToBulkDelete because:
- Restoring individual content is a less risky operation than bulk deletion
- Users who can see deleted content should be able to restore it
- Course-level moderation staff need this capability for day-to-day moderation

Allowed users (course-level permissions):
- Global staff (platform-wide)
- Course instructors
- Course staff
- Discussion moderators (course-specific)
- Discussion community TAs (course-specific)
- Discussion administrators (course-specific)
"""

def has_permission(self, request, view):
Expand All @@ -449,12 +404,6 @@ def has_permission(self, request, view):
except InvalidKeyError:
return False

# Check if user is course staff or instructor
if CourseStaffRole(course_key).has_user(request.user) or \
CourseInstructorRole(course_key).has_user(request.user):
return True

# Check if user has discussion privileges (moderator, community TA, administrator)
if has_discussion_privileges(request.user, course_key):
return True

Expand Down
30 changes: 22 additions & 8 deletions lms/djangoapps/discussion/rest_api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def get_context(course, request, thread=None):
cc_requester["course_id"] = course.id
course_discussion_settings = CourseDiscussionSettings.get(course.id)
is_global_staff = GlobalStaff().has_user(requester)
all_privileged_ids = set(moderator_user_ids) | set(ta_user_ids) | set(course_staff_user_ids)
all_privileged_ids = set(moderator_user_ids) | set(ta_user_ids)
has_moderation_privilege = requester.id in all_privileged_ids or is_global_staff
return {
"course": course,
Expand Down Expand Up @@ -242,14 +242,22 @@ def _validate_non_updatable(self, value):

def _is_user_privileged(self, user_id):
"""
Returns a boolean indicating whether the given user_id identifies a
privileged user.
Returns a boolean indicating whether the given user_id identifies a privileged user.
"""
return (
is_privileged = (
user_id in self.context["moderator_user_ids"]
or user_id in self.context["ta_user_ids"]
)

if not is_privileged:
try:
user = User.objects.get(id=user_id)
is_privileged = GlobalStaff().has_user(user)
except User.DoesNotExist:
pass

return is_privileged

def _is_anonymous(self, obj):
"""
Returns a boolean indicating whether the content should be anonymous to
Expand All @@ -271,16 +279,22 @@ def get_author(self, obj):

def _get_user_label(self, user_id):
"""
Returns the role label (i.e. "Staff", "Moderator" or "Community TA") for the user
with the given id.
Returns the role label for the user with the given id.
"""
is_staff = user_id in self.context["course_staff_user_ids"]
is_moderator = user_id in self.context["moderator_user_ids"]
is_ta = user_id in self.context["ta_user_ids"]

is_global_staff = False
if not (is_moderator or is_ta):
try:
user = User.objects.get(id=user_id)
is_global_staff = GlobalStaff().has_user(user)
except User.DoesNotExist:
pass

return (
"Staff"
if is_staff
if is_global_staff
else "Moderator" if is_moderator else "Community TA" if is_ta else None
)

Expand Down
1 change: 0 additions & 1 deletion lms/djangoapps/discussion/rest_api/tests/test_api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ def test_basic_in_blackout_period_with_user_access(self, mock_emit):
"closed",
"copy_link",
"following",
"muted",
"pinned",
"raw_body",
"read",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ def test_global_staff_role_has_permission(self):
GlobalStaff().add_users(user)
self.assertTrue(can_take_action_on_spam(user, self.course_key))

def test_course_staff_has_permission(self):
"""Course staff should have permission for their course."""
def test_course_staff_no_permission(self):
"""Course staff should NOT have permission (authoring role only)."""
user = UserFactory.create()
CourseStaffRole(self.course_key).add_users(user)
self.assertTrue(can_take_action_on_spam(user, self.course_key))
self.assertFalse(can_take_action_on_spam(user, self.course_key))

def test_course_instructor_has_permission(self):
"""Course instructors should have permission for their course."""
def test_course_instructor_no_permission(self):
"""Course instructors should NOT have permission (authoring role only)."""
user = UserFactory.create()
CourseInstructorRole(self.course_key).add_users(user)
self.assertTrue(can_take_action_on_spam(user, self.course_key))
self.assertFalse(can_take_action_on_spam(user, self.course_key))

def test_forum_moderator_has_permission(self):
"""Forum moderators should have permission for their course."""
Expand Down Expand Up @@ -74,16 +74,18 @@ def test_community_ta_no_permission(self):
self.assertFalse(can_take_action_on_spam(user, self.course_key))

def test_staff_different_course_no_permission(self):
"""Staff from a different course should not have permission."""
"""Discussion moderators from a different course should not have permission."""
other_course = CourseFactory.create(org='OtherX', number='CS201', run='2024')
user = UserFactory.create()
CourseStaffRole(other_course.id).add_users(user)
role = Role.objects.create(name='Moderator', course_id=other_course.id)
role.users.add(user)
self.assertFalse(can_take_action_on_spam(user, self.course_key))

def test_accepts_string_course_id(self):
"""Function should accept string course_id and convert it."""
user = UserFactory.create()
CourseStaffRole(self.course_key).add_users(user)
role = Role.objects.create(name='Moderator', course_id=self.course_key)
role.users.add(user)
self.assertTrue(can_take_action_on_spam(user, str(self.course_key)))


Expand Down Expand Up @@ -130,23 +132,23 @@ def test_global_staff_with_course_id_in_data(self):

self.assertTrue(self.permission.has_permission(request, view))

def test_course_staff_with_course_id_in_data(self):
"""Course staff should have permission when course_id is in request data."""
def test_course_staff_denied(self):
"""Course staff should NOT have permission (authoring role only)."""
user = UserFactory.create()
CourseStaffRole(self.course.id).add_users(user)
request = self._create_request_with_data(user, self.course_key)
view = self._create_view_with_kwargs()

self.assertTrue(self.permission.has_permission(request, view))
self.assertFalse(self.permission.has_permission(request, view))

def test_course_instructor_with_course_id_in_data(self):
"""Course instructors should have permission when course_id is in request data."""
def test_course_instructor_denied(self):
"""Course instructors should NOT have permission (authoring role only)."""
user = UserFactory.create()
CourseInstructorRole(self.course.id).add_users(user)
request = self._create_request_with_data(user, self.course_key)
view = self._create_view_with_kwargs()

self.assertTrue(self.permission.has_permission(request, view))
self.assertFalse(self.permission.has_permission(request, view))

def test_forum_moderator_with_course_id_in_data(self):
"""Forum moderators should have permission when course_id is in request data."""
Expand All @@ -169,7 +171,8 @@ def test_regular_student_denied(self):
def test_course_id_in_url_kwargs(self):
"""Permission should work when course_id is in URL kwargs."""
user = UserFactory.create()
CourseStaffRole(self.course.id).add_users(user)
role = Role.objects.create(name='Moderator', course_id=self.course.id)
role.users.add(user)
request = self.factory.get('/api/discussion/v1/moderation/banned-users/')
request.user = user
request.data = {}
Expand All @@ -193,10 +196,11 @@ def test_no_course_id_only_global_staff_allowed(self):
self.assertFalse(self.permission.has_permission(request, view))

def test_staff_different_course_denied(self):
"""Staff from different course should be denied."""
"""Discussion moderators from different course should be denied."""
other_course = CourseFactory.create(org='OtherX', number='CS201', run='2024')
user = UserFactory.create()
CourseStaffRole(other_course.id).add_users(user)
role = Role.objects.create(name='Moderator', course_id=other_course.id)
role.users.add(user)
request = self._create_request_with_data(user, self.course_key)
view = self._create_view_with_kwargs()

Expand Down
Loading
Loading