diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 17a3fd46..537ad342 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,15 @@ Unreleased * + +0.21.0 - 2026-02-12 +******************** + +Added +===== + +* Add course staff role, permission to manage advanced course settings, and introduce course scope + 0.20.0 - 2025-11-27 ******************** diff --git a/openedx_authz/__init__.py b/openedx_authz/__init__.py index c158cdcc..ef2a05a7 100644 --- a/openedx_authz/__init__.py +++ b/openedx_authz/__init__.py @@ -4,6 +4,6 @@ import os -__version__ = "0.20.0" +__version__ = "0.21.0" ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index 3eb6e6c5..859e8e31 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -9,6 +9,7 @@ from attrs import define from opaque_keys import InvalidKeyError +from opaque_keys.edx.keys import CourseKey from opaque_keys.edx.locator import LibraryLocatorV2 try: @@ -16,6 +17,11 @@ except ImportError: ContentLibrary = None +try: + from openedx.core.djangoapps.content.course_overviews.models import CourseOverview +except ImportError: + CourseOverview = None + __all__ = [ "UserData", "PermissionData", @@ -212,6 +218,8 @@ def get_subclass_by_namespaced_key(mcs, namespaced_key: str) -> Type["ScopeData" The ScopeData subclass for the namespace, or ScopeData if namespace not recognized. Examples: + >>> ScopeMeta.get_subclass_by_namespaced_key('course-v1^course-v1:WGU+CS002+2025_T1') + >>> ScopeMeta.get_subclass_by_namespaced_key('lib^lib:DemoX:CSPROB') >>> ScopeMeta.get_subclass_by_namespaced_key('global^generic') @@ -462,6 +470,108 @@ def __repr__(self): return self.namespaced_key +@define +class CourseOverviewData(ScopeData): + """A course scope for authorization in the Open edX platform. + + Courses uses the CourseKey format for identification. + + Attributes: + NAMESPACE: 'course-v1' for course scopes. + external_key: The course identifier (e.g., 'course-v1:TestOrg+TestCourse+2024_T1'). + Must be a valid CourseKey format. + namespaced_key: The course identifier with namespace (e.g., 'course-v1^course-v1:TestOrg+TestCourse+2024_T1'). + course_id: Property alias for external_key. + + Examples: + >>> course = CourseOverviewData(external_key='course-v1:TestOrg+TestCourse+2024_T1') + >>> course.namespaced_key + 'course-v1^course-v1:TestOrg+TestCourse+2024_T1' + >>> course.course_id + 'course-v1:TestOrg+TestCourse+2024_T1' + + """ + + NAMESPACE: ClassVar[str] = "course-v1" + + @property + def course_id(self) -> str: + """The course identifier as used in Open edX (e.g., 'course-v1:TestOrg+TestCourse+2024_T1'). + + This is an alias for external_key that represents the course ID without the namespace prefix. + + Returns: + str: The course identifier without namespace. + """ + return self.external_key + + @property + def course_key(self) -> CourseKey: + """The CourseKey object for the course. + + Returns: + CourseKey: The course key object. + """ + return CourseKey.from_string(self.course_id) + + @classmethod + def validate_external_key(cls, external_key: str) -> bool: + """Validate the external_key format for CourseOverviewData. + + Args: + external_key: The external key to validate. + + Returns: + bool: True if valid, False otherwise. + """ + try: + CourseKey.from_string(external_key) + return True + except InvalidKeyError: + return False + + def get_object(self) -> CourseOverview | None: + """Retrieve the CourseOverview instance associated with this scope. + + This method converts the course_id to a CourseKey and queries the + database to fetch the corresponding CourseOverview object. + + Returns: + CourseOverview | None: The CourseOverview instance if found in the database, + or None if the course does not exist or has an invalid key format. + + Examples: + >>> course_scope = CourseOverviewData(external_key='course-v1:TestOrg+TestCourse+2024_T1') + >>> course_obj = course_scope.get_object() # CourseOverview object + """ + try: + course_obj = CourseOverview.get_from_id(self.course_key) + # Validate canonical key: get_by_key is case-insensitive, but we require exact match + # This ensures authorization uses canonical course IDs consistently + if course_obj.id != self.course_key: + raise CourseOverview.DoesNotExist + except (InvalidKeyError, CourseOverview.DoesNotExist): + return None + + return course_obj + + def exists(self) -> bool: + """Check if the course overview exists. + + Returns: + bool: True if the course overview exists, False otherwise. + """ + return self.get_object() is not None + + def __str__(self): + """Human readable string representation of the course overview.""" + return self.course_id + + def __repr__(self): + """Developer friendly string representation of the course overview.""" + return self.namespaced_key + + class SubjectMeta(type): """Metaclass for SubjectData to handle dynamic subclass instantiation based on namespace.""" diff --git a/openedx_authz/constants/permissions.py b/openedx_authz/constants/permissions.py index 033a8ee6..0aa00e8e 100644 --- a/openedx_authz/constants/permissions.py +++ b/openedx_authz/constants/permissions.py @@ -53,3 +53,12 @@ action=ActionData(external_key=f"{CONTENT_LIBRARIES_NAMESPACE}.delete_library_collection"), effect="allow", ) + +# Course Permissions + +COURSES_NAMESPACE = "courses" + +MANAGE_ADVANCED_SETTINGS = PermissionData( + action=ActionData(external_key=f"{COURSES_NAMESPACE}.manage_advanced_settings"), + effect="allow", +) diff --git a/openedx_authz/constants/roles.py b/openedx_authz/constants/roles.py index 44da76fa..ebbdb041 100644 --- a/openedx_authz/constants/roles.py +++ b/openedx_authz/constants/roles.py @@ -56,3 +56,13 @@ LIBRARY_AUTHOR = RoleData(external_key="library_author", permissions=LIBRARY_AUTHOR_PERMISSIONS) LIBRARY_CONTRIBUTOR = RoleData(external_key="library_contributor", permissions=LIBRARY_CONTRIBUTOR_PERMISSIONS) LIBRARY_USER = RoleData(external_key="library_user", permissions=LIBRARY_USER_PERMISSIONS) + + +# Course Roles and Permissions + + +COURSE_STAFF_PERMISSIONS = [ + permissions.MANAGE_ADVANCED_SETTINGS, +] + +COURSE_STAFF = RoleData(external_key="course_staff", permissions=COURSE_STAFF_PERMISSIONS) diff --git a/openedx_authz/engine/config/authz.policy b/openedx_authz/engine/config/authz.policy index 810dbe4e..d1405bbd 100644 --- a/openedx_authz/engine/config/authz.policy +++ b/openedx_authz/engine/config/authz.policy @@ -68,3 +68,9 @@ g2, act^content_libraries.manage_library_team, act^content_libraries.view_librar g2, act^content_libraries.delete_library_collection, act^content_libraries.edit_library_collection g2, act^content_libraries.create_library_collection, act^content_libraries.edit_library_collection g2, act^content_libraries.edit_library_collection, act^content_libraries.view_library + + +# Course Policies + +# Course Staff Permissions +p, role^course_staff, act^courses.manage_advanced_settings, course-v1^*, allow diff --git a/openedx_authz/engine/matcher.py b/openedx_authz/engine/matcher.py index d5c3fcf6..0bc7e3d6 100644 --- a/openedx_authz/engine/matcher.py +++ b/openedx_authz/engine/matcher.py @@ -3,18 +3,24 @@ from django.contrib.auth import get_user_model from edx_django_utils.cache import RequestCache -from openedx_authz.api.data import ContentLibraryData, ScopeData, UserData +from openedx_authz.api.data import ContentLibraryData, CourseOverviewData, ScopeData, UserData from openedx_authz.rest_api.utils import get_user_by_username_or_email User = get_user_model() +SCOPES_WITH_ADMIN_OR_SUPERUSER_CHECK = { + (ContentLibraryData.NAMESPACE, ContentLibraryData), + (CourseOverviewData.NAMESPACE, CourseOverviewData), +} + + def is_admin_or_superuser_check(request_user: str, request_action: str, request_scope: str) -> bool: # pylint: disable=unused-argument """ Evaluates custom, non-role-based conditions for authorization checks. Checks attribute-based conditions that don't rely on role assignments. - Currently handles ContentLibraryData scopes by granting access to staff + Currently handles ContentLibraryData and CourseOverviewData scopes by granting access to staff and superusers. Args: @@ -24,7 +30,7 @@ def is_admin_or_superuser_check(request_user: str, request_action: str, request_ Returns: bool: True if the condition is satisfied (user is staff/superuser for - ContentLibraryData scopes), False otherwise (including when user + ContentLibraryData and CourseOverviewData scopes), False otherwise (including when user doesn't exist or scope type is not supported) """ @@ -33,8 +39,8 @@ def is_admin_or_superuser_check(request_user: str, request_action: str, request_ request_cache = RequestCache("rbac_is_admin_or_superuser") # TODO: This special case for superuser and staff users is currently only for - # content libraries. See: https://github.com/openedx/openedx-authz/issues/87 - if not isinstance(scope, ContentLibraryData): + # content libraries and course overviews. See: https://github.com/openedx/openedx-authz/issues/87 + if (scope.NAMESPACE, type(scope)) not in SCOPES_WITH_ADMIN_OR_SUPERUSER_CHECK: return False cached_response = request_cache.get_cached_response(username) diff --git a/openedx_authz/migrations/0007_coursescope.py b/openedx_authz/migrations/0007_coursescope.py new file mode 100644 index 00000000..265f2de7 --- /dev/null +++ b/openedx_authz/migrations/0007_coursescope.py @@ -0,0 +1,44 @@ +# Generated by Django 4.2.24 on 2026-02-06 17:19 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("openedx_authz", "0006_migrate_legacy_permissions"), + ] + + operations = [ + migrations.CreateModel( + name="CourseScope", + fields=[ + ( + "scope_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="openedx_authz.scope", + ), + ), + ( + "course_overview", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="authz_scopes", + to=settings.OPENEDX_AUTHZ_COURSE_OVERVIEW_MODEL, + ), + ), + ], + options={ + "abstract": False, + }, + bases=("openedx_authz.scope",), + ), + ] diff --git a/openedx_authz/models/scopes.py b/openedx_authz/models/scopes.py index 02a82e83..24b01556 100644 --- a/openedx_authz/models/scopes.py +++ b/openedx_authz/models/scopes.py @@ -8,6 +8,7 @@ from django.apps import apps from django.conf import settings from django.db import models +from opaque_keys.edx.keys import CourseKey from opaque_keys.edx.locator import LibraryLocatorV2 from openedx_authz.models.core import Scope @@ -31,7 +32,26 @@ def get_content_library_model(): return None +def get_course_overview_model(): + """Return the CourseOverview model class specified by settings. + + The setting `OPENEDX_AUTHZ_COURSE_OVERVIEW_MODEL` should be an + app_label.ModelName string (e.g. 'course_overviews.CourseOverview'). + """ + COURSE_OVERVIEW_MODEL = getattr( + settings, + "OPENEDX_AUTHZ_COURSE_OVERVIEW_MODEL", + "course_overviews.CourseOverview", + ) + try: + app_label, model_name = COURSE_OVERVIEW_MODEL.split(".") + return apps.get_model(app_label, model_name, require_ready=False) + except LookupError: + return None + + ContentLibrary = get_content_library_model() +CourseOverview = get_course_overview_model() class ContentLibraryScope(Scope): @@ -42,7 +62,7 @@ class ContentLibraryScope(Scope): NAMESPACE = "lib" - # Link to the actual course or content library, if applicable. In other cases, this could be null. + # Link to the actual content library, if applicable. In other cases, this could be null. # Piggybacking on the existing ContentLibrary model to keep the ExtendedCasbinRule up to date # by deleting the Scope, and thus the ExtendedCasbinRule, when the ContentLibrary is deleted. # @@ -75,3 +95,46 @@ def get_or_create_for_external_key(cls, scope): content_library = ContentLibrary.objects.get_by_key(library_key) scope, _ = cls.objects.get_or_create(content_library=content_library) return scope + + +class CourseScope(Scope): + """Scope representing a course in the authorization system. + + .. no_pii: + """ + + NAMESPACE = "course-v1" + + # Link to the actual course, if applicable. In other cases, this could be null. + # Piggybacking on the existing CourseOverview model to keep the ExtendedCasbinRule up to date + # by deleting the Scope, and thus the ExtendedCasbinRule, when the CourseOverview is deleted. + # + # When course_overviews IS available, the on_delete=CASCADE will still work at the + # application level through Django's signal handlers. + # Use a string reference to the external app's model so Django won't try + # to import it at model import time. The migration already records the + # dependency on `course_overviews` when the app is present. + course_overview = models.ForeignKey( + settings.OPENEDX_AUTHZ_COURSE_OVERVIEW_MODEL, + on_delete=models.CASCADE, + null=True, + blank=True, + related_name="authz_scopes", + swappable=True, + ) + + @classmethod + def get_or_create_for_external_key(cls, scope): + """Get or create a CourseScope for the given external key. + + Args: + scope: ScopeData object with an external_key attribute containing + a CourseKey string. + + Returns: + CourseScope: The Scope instance for the given CourseOverview + """ + course_key = CourseKey.from_string(scope.external_key) + course_overview = CourseOverview.get_from_id(course_key) + scope, _ = cls.objects.get_or_create(course_overview=course_overview) + return scope diff --git a/openedx_authz/settings/common.py b/openedx_authz/settings/common.py index 03662a6c..81e060b8 100644 --- a/openedx_authz/settings/common.py +++ b/openedx_authz/settings/common.py @@ -46,6 +46,10 @@ def plugin_settings(settings): if not hasattr(settings, "OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL"): settings.OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL = "content_libraries.ContentLibrary" + # Set default CourseOverview model for swappable dependency + if not hasattr(settings, "OPENEDX_AUTHZ_COURSE_OVERVIEW_MODEL"): + settings.OPENEDX_AUTHZ_COURSE_OVERVIEW_MODEL = "course_overviews.CourseOverview" + # Set default CASBIN_LOG_LEVEL if not already set. # This setting defines the logging level for the Casbin enforcer. if not hasattr(settings, "CASBIN_LOG_LEVEL"): diff --git a/openedx_authz/settings/test.py b/openedx_authz/settings/test.py index 5fa633a5..cce6d3de 100644 --- a/openedx_authz/settings/test.py +++ b/openedx_authz/settings/test.py @@ -76,3 +76,4 @@ def plugin_settings(settings): # pylint: disable=unused-argument # Use stub model for testing instead of the real content_libraries app OPENEDX_AUTHZ_CONTENT_LIBRARY_MODEL = "stubs.ContentLibrary" +OPENEDX_AUTHZ_COURSE_OVERVIEW_MODEL = "stubs.CourseOverview" diff --git a/openedx_authz/tests/api/test_data.py b/openedx_authz/tests/api/test_data.py index a1ac6227..9807f4e9 100644 --- a/openedx_authz/tests/api/test_data.py +++ b/openedx_authz/tests/api/test_data.py @@ -9,6 +9,7 @@ from openedx_authz.api.data import ( ActionData, ContentLibraryData, + CourseOverviewData, PermissionData, RoleAssignmentData, RoleData, @@ -229,8 +230,11 @@ def test_scope_data_registration(self): self.assertIs(ScopeData.scope_registry["global"], ScopeData) self.assertIn("lib", ScopeData.scope_registry) self.assertIs(ScopeData.scope_registry["lib"], ContentLibraryData) + self.assertIn("course-v1", ScopeData.scope_registry) + self.assertIs(ScopeData.scope_registry["course-v1"], CourseOverviewData) @data( + ("course-v1^course-v1:WGU+CS002+2025_T1", CourseOverviewData), ("lib^lib:DemoX:CSPROB", ContentLibraryData), ("global^generic_scope", ScopeData), ) @@ -248,6 +252,7 @@ def test_dynamic_instantiation_via_namespaced_key(self, namespaced_key, expected self.assertEqual(instance.namespaced_key, namespaced_key) @data( + ("course-v1^course-v1:WGU+CS002+2025_T1", CourseOverviewData), ("lib^lib:DemoX:CSPROB", ContentLibraryData), ("global^generic", ScopeData), ("unknown^something", ScopeData), @@ -266,6 +271,7 @@ def test_get_subclass_by_namespaced_key(self, namespaced_key, expected_class): self.assertIs(subclass, expected_class) @data( + ("course-v1:WGU+CS002+2025_T1", CourseOverviewData), ("lib:DemoX:CSPROB", ContentLibraryData), ("lib:edX:Demo", ContentLibraryData), ("global:generic_scope", ScopeData), @@ -283,20 +289,24 @@ def test_get_subclass_by_external_key(self, external_key, expected_class): self.assertIs(subclass, expected_class) @data( - ("lib:DemoX:CSPROB", True), - ("lib:edX:Demo", True), - ("invalid_library_key", False), - ("lib-DemoX-CSPROB", False), + ("course-v1:WGU+CS002+2025_T1", True, CourseOverviewData), + ("course:WGU+CS002+2025_T1", False, CourseOverviewData), + ("course-v2:WGU+CS002+2025_T1", False, CourseOverviewData), + ("course-v1-WGU+CS002+2025_T1", False, CourseOverviewData), + ("lib:DemoX:CSPROB", True, ContentLibraryData), + ("lib:edX:Demo", True, ContentLibraryData), + ("invalid_library_key", False, ContentLibraryData), + ("lib-DemoX-CSPROB", False, ContentLibraryData), ) @unpack - def test_content_library_validate_external_key(self, external_key, expected_valid): - """Test ContentLibraryData.validate_external_key validates library keys. + def test_scope_validate_external_key(self, external_key, expected_valid, expected_class): + """Test Subclasses ScopeData.validate_external_key validates library keys. Expected Result: - - Valid library keys (lib:Org:Code) return True + - Valid Scope keys like (lib:Org:Code) return True - Invalid formats return False """ - result = ContentLibraryData.validate_external_key(external_key) + result = expected_class.validate_external_key(external_key) self.assertEqual(result, expected_valid) diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 197fe425..2830db5b 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -61,19 +61,6 @@ def _mock_get_or_create_subject(subject_data): return subject -# Apply patches at module level using the new manager method -_scope_patcher = patch( - "openedx_authz.models.ScopeManager.get_or_create_for_external_key", - side_effect=_mock_get_or_create_scope, -) -_subject_patcher = patch( - "openedx_authz.models.SubjectManager.get_or_create_for_external_key", - side_effect=_mock_get_or_create_subject, -) -_scope_patcher.start() -_subject_patcher.start() - - class BaseRolesTestCase(TestCase): """Base test case with helper methods for roles testing. @@ -134,6 +121,26 @@ def setUpClass(cls): to add their specific role assignments by calling _assign_roles_to_users. """ super().setUpClass() + + # Apply patches here so they only affect this test + # and don't interfere with other tests using the real + # get_or_create_for_external_key implementation. + cls._scope_patcher = patch( + "openedx_authz.models.ScopeManager.get_or_create_for_external_key", + side_effect=_mock_get_or_create_scope, + ) + cls._subject_patcher = patch( + "openedx_authz.models.SubjectManager.get_or_create_for_external_key", + side_effect=_mock_get_or_create_subject, + ) + + cls._scope_patcher.start() + cls._subject_patcher.start() + + AuthzEnforcer.get_enforcer().stop_auto_load_policy() + AuthzEnforcer.get_enforcer().enable_auto_save(True) + cls._seed_database_with_policies() + AuthzEnforcer.get_enforcer().stop_auto_load_policy() # Enable auto-save to ensure policies are saved to the database # This is necessary because the tests are not using auto-load policy @@ -150,6 +157,15 @@ def tearDown(self): super().tearDown() AuthzEnforcer.get_enforcer().clear_policy() # Clear policies after each test to ensure isolation + @classmethod + def tearDownClass(cls): + # Stop patches cleanly + # This ensures that if any test fails, the patches will still be stopped properly, + # preventing side effects on other tests. + cls._scope_patcher.stop() + cls._subject_patcher.stop() + super().tearDownClass() + class RolesTestSetupMixin(BaseRolesTestCase): """Test case with comprehensive role assignments for general roles testing.""" @@ -181,6 +197,17 @@ def setUpClass(cls): "role_name": roles.LIBRARY_USER.external_key, "scope_name": "lib:Org1:english_101", }, + # Basic course roles from authz.policy + { + "subject_name": "daniel", + "role_name": roles.COURSE_STAFF.external_key, + "scope_name": "course-v1:TestOrg+TestCourse+2024_T1", + }, + { + "subject_name": "judy", + "role_name": roles.COURSE_STAFF.external_key, + "scope_name": "course-v1:TestOrg+TestCourse+2024_T2", + }, # Multi-role assignments - same subject with different roles in different libraries { "subject_name": "eve", @@ -208,6 +235,16 @@ def setUpClass(cls): "role_name": roles.LIBRARY_CONTRIBUTOR.external_key, "scope_name": "lib:Org1:math_advanced", }, + { + "subject_name": "maria", + "role_name": roles.COURSE_STAFF.external_key, + "scope_name": "course-v1:TestOrg+TestCourse+2024_T3", + }, + { + "subject_name": "aida", + "role_name": roles.COURSE_STAFF.external_key, + "scope_name": "course-v1:TestOrg+TestCourse+2024_T3", + }, # Hierarchical scope assignments - different specificity levels { "subject_name": "ivy", @@ -240,6 +277,21 @@ def setUpClass(cls): "role_name": roles.LIBRARY_AUTHOR.external_key, "scope_name": "lib:Org4:art_301", }, + { + "subject_name": "carlos", + "role_name": roles.COURSE_STAFF.external_key, + "scope_name": "course-v1:TestOrg+TestCourse+2024_T1", + }, + { + "subject_name": "carlos", + "role_name": roles.COURSE_STAFF.external_key, + "scope_name": "course-v1:TestOrg+TestCourse+2024_T2", + }, + { + "subject_name": "carlos", + "role_name": roles.COURSE_STAFF.external_key, + "scope_name": "course-v1:TestOrg+TestCourse+2024_T3", + }, # Mixed permission levels across libraries for comprehensive testing { "subject_name": "maya", diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index 64f3848c..26008f23 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -430,6 +430,21 @@ class TestUserPermissions(UserAssignmentsSetupMixin): """Test suite for user permission API functions.""" @data( + # Course permissions + ("daniel", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T1", True), + ("daniel", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T2", False), + ("judy", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T1", False), + ("judy", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T2", True), + # Multiple subjects with same role in same scope + ("maria", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T3", True), + ("aida", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T3", True), + ("maria", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T1", False), + ("aida", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T1", False), + # Same user, same role, different scopes + ("carlos", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T1", True), + ("carlos", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T2", True), + ("carlos", permissions.MANAGE_ADVANCED_SETTINGS.identifier, "course-v1:TestOrg+TestCourse+2024_T3", True), + # Library permissions ("alice", permissions.DELETE_LIBRARY.identifier, "lib:Org1:math_101", True), ("bob", permissions.PUBLISH_LIBRARY_CONTENT.identifier, "lib:Org1:history_201", True), ("eve", permissions.MANAGE_LIBRARY_TEAM.identifier, "lib:Org2:physics_401", True), diff --git a/openedx_authz/tests/stubs/models.py b/openedx_authz/tests/stubs/models.py index def5a7aa..c64995aa 100644 --- a/openedx_authz/tests/stubs/models.py +++ b/openedx_authz/tests/stubs/models.py @@ -7,6 +7,8 @@ from django.conf import settings from django.contrib.auth.models import Group from django.db import models +from opaque_keys.edx.django.models import CourseKeyField, UsageKeyField +from opaque_keys.edx.keys import CourseKey from opaque_keys.edx.locator import LibraryLocatorV2 @@ -87,3 +89,39 @@ class ContentLibraryPermission(models.Model): def __str__(self): who = self.user.username if self.user else self.group.name return f"ContentLibraryPermission ({self.access_level} for {who})" + + +class CourseOverview(models.Model): + """ + Stub model representing a course overview for testing purposes. + + This model contains basic course metadata such as an ID, display name, and organization. + It is used to link CourseScope instances to actual courses in the system. + + .. no_pii: + """ + + # Course identification + id = CourseKeyField(db_index=True, primary_key=True, max_length=255) + _location = UsageKeyField(max_length=255) + org = models.TextField(max_length=255, default="outdated_entry") + display_name = models.TextField(null=True) + + @classmethod + def get_from_id(cls, course_key): + """Get a CourseOverview by its course key. + + Args: + course_key: The course key to look up. + + Returns: + CourseOverview: The course overview instance. + """ + if course_key is None: + raise ValueError("course_key must not be None") + try: + key = str(CourseKey.from_string(str(course_key))) + except Exception: # pylint: disable=broad-exception-caught + key = str(course_key) + obj, _ = cls.objects.get_or_create(id=key) + return obj diff --git a/openedx_authz/tests/test_enforcer.py b/openedx_authz/tests/test_enforcer.py index ea12d855..e213a3bb 100644 --- a/openedx_authz/tests/test_enforcer.py +++ b/openedx_authz/tests/test_enforcer.py @@ -143,12 +143,12 @@ def _add_test_policies_for_multiple_scopes(self): global_enforcer = AuthzEnforcer.get_enforcer() test_policies = [ # Course policies - ["role^course_instructor", "act^edit_course", "course^*", "allow"], - ["role^course_instructor", "act^grade_students", "course^*", "allow"], - ["role^course_ta", "act^view_course", "course^*", "allow"], - ["role^course_ta", "act^grade_assignments", "course^*", "allow"], - ["role^course_student", "act^view_course", "course^*", "allow"], - ["role^course_student", "act^submit_assignment", "course^*", "allow"], + ["role^course_instructor", "act^edit_course", "course-v1^*", "allow"], + ["role^course_instructor", "act^grade_students", "course-v1^*", "allow"], + ["role^course_ta", "act^view_course", "course-v1^*", "allow"], + ["role^course_ta", "act^grade_assignments", "course-v1^*", "allow"], + ["role^course_student", "act^view_course", "course-v1^*", "allow"], + ["role^course_student", "act^submit_assignment", "course-v1^*", "allow"], # Organization policies ["role^org_admin", "act^manage_org", "org^*", "allow"], ["role^org_admin", "act^create_courses", "org^*", "allow"], @@ -397,7 +397,7 @@ def test_multi_scope_filtering(self): """ global_enforcer = AuthzEnforcer.get_enforcer() lib_scope = "lib^*" - course_scope = "course^*" + course_scope = "course-v1^*" org_scope = "org^*" expected_lib_count = self._count_policies_in_file(scope_pattern=lib_scope) @@ -413,7 +413,7 @@ def test_multi_scope_filtering(self): org_count = len(global_enforcer.get_policy()) self.assertEqual(lib_count, expected_lib_count) - self.assertEqual(course_count, 6) + self.assertEqual(course_count, 7) self.assertEqual(org_count, 3) global_enforcer.clear_policy() diff --git a/openedx_authz/tests/test_engine_utils.py b/openedx_authz/tests/test_engine_utils.py index 6908e831..33dc8708 100644 --- a/openedx_authz/tests/test_engine_utils.py +++ b/openedx_authz/tests/test_engine_utils.py @@ -76,10 +76,10 @@ def test_migrate_all_file_policies_to_database(self): Expected Result: - All policies from the file are loaded into the database - - The file contains 31 regular policies (p rules) + - The file contains 32 regular policies (p rules) - Policy content matches expected file content """ - expected_policy_count = 31 + expected_policy_count = 32 migrate_policy_between_enforcers(self.source_enforcer, self.target_enforcer) self.target_enforcer.load_policy() @@ -216,7 +216,7 @@ def test_migrate_complete_file_contents(self): self.assertEqual( len(self.target_enforcer.get_policy()), - 31, + 32, "Should have 31 regular policies from file", ) self.assertEqual( @@ -250,8 +250,8 @@ def test_migrate_partial_duplicates(self): target_policies = self.target_enforcer.get_policy() self.assertEqual( len(target_policies), - 31, - "Should have 31 policies total, with no duplicates", + 32, + "Should have 32 policies total, with no duplicates", ) duplicates = CasbinRule.objects.values("v0", "v1", "v2").annotate(total=Count("*")).filter(total__gt=1) @@ -346,7 +346,7 @@ def test_migrate_preserves_existing_db_policies(self): migrate_policy_between_enforcers(self.source_enforcer, self.target_enforcer) target_policies = self.target_enforcer.get_policy() - self.assertEqual(len(target_policies), 32, "Should have 31 file policies + 1 custom policy") + self.assertEqual(len(target_policies), 33, "Should have 32 file policies + 1 custom policy") self.assertIn(custom_policy, target_policies, "Custom database policy should be preserved") def test_migrate_preserves_user_role_assignments_in_db(self): @@ -382,4 +382,4 @@ def test_migrate_preserves_user_role_assignments_in_db(self): ) target_policies = self.target_enforcer.get_policy() - self.assertEqual(len(target_policies), 31, "All 31 policies from file should be loaded") + self.assertEqual(len(target_policies), 32, "All 32 policies from file should be loaded") diff --git a/openedx_authz/tests/test_models.py b/openedx_authz/tests/test_models.py index d8a15f9f..94e41a43 100644 --- a/openedx_authz/tests/test_models.py +++ b/openedx_authz/tests/test_models.py @@ -13,17 +13,19 @@ which run against the real ContentLibrary model. """ +from unittest.mock import patch from uuid import UUID, uuid4 from casbin_adapter.models import CasbinRule from django.contrib.auth import get_user_model from django.test import TestCase +from opaque_keys.edx.keys import CourseKey from opaque_keys.edx.locator import LibraryLocatorV2 -from openedx_authz.api.data import ContentLibraryData, UserData +from openedx_authz.api.data import ContentLibraryData, CourseOverviewData, UserData from openedx_authz.models import ExtendedCasbinRule, Scope, Subject from openedx_authz.models.engine import PolicyCacheControl -from openedx_authz.tests.stubs.models import ContentLibrary +from openedx_authz.tests.stubs.models import ContentLibrary, CourseOverview User = get_user_model() @@ -85,6 +87,7 @@ def test_extended_casbin_rule_creation_with_all_fields(self): self.assertEqual(extended_rule.subject, self.subject) self.assertIsNotNone(extended_rule.created_at) self.assertIsNotNone(extended_rule.updated_at) + self.assertEqual(extended_rule.scope.content_library, self.content_library) def test_extended_casbin_rule_cascade_deletion_when_scope_deleted(self): """Deleting a Scope should cascade to ExtendedCasbinRule and trigger the handler cleanup. @@ -141,6 +144,139 @@ def test_extended_casbin_rule_cascade_deletion_when_subject_deleted(self): self.assertFalse(Subject.objects.filter(id=subject_id).exists()) +class TestCourseExtendedCasbinRuleModelWithStub(TestCase): + """Test cases for the ExtendedCasbinRule model using stub setup and CourseOverview stub.""" + + def setUp(self): + """Set up test fixtures.""" + self.test_username = "test_user" + self.test_user = User.objects.create_user(username=self.test_username, email="test@example.com") + + self.course_key = CourseKey.from_string("course-v1:TestOrg+TestCourse+2024_T1") + self.course_overview = CourseOverview.get_from_id(self.course_key) + + self.casbin_rule = CasbinRule.objects.create( + ptype="p", + v0="user^test_user", + v1="role^instructor", + v2="course-v1^course-v1:TestOrg+TestCourse+2024_T1", + v3="allow", + ) + + subject_data = UserData(external_key=self.test_username) + self.subject = Subject.objects.get_or_create_for_external_key(subject_data) + + self.scope_data = CourseOverviewData(external_key=str(self.course_key)) + self.scope = Scope.objects.get_or_create_for_external_key(self.scope_data) + + @patch("openedx_authz.api.data.CourseOverview", CourseOverview) # Patch to use the stub CourseOverview + def test_extended_casbin_rule_creation_with_all_fields(self): + """Test creating ExtendedCasbinRule with all fields populated. + + Expected Result: + - ExtendedCasbinRule is created successfully. + - All fields are populated correctly. + - Timestamps are set automatically. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + description="Test rule for instructor role", + metadata={"created_by": "test_system", "priority": 1}, + scope=self.scope, + subject=self.subject, + ) + + self.assertIsNotNone(extended_rule) + self.assertEqual(extended_rule.casbin_rule_key, casbin_rule_key) + self.assertEqual(extended_rule.casbin_rule, self.casbin_rule) + self.assertEqual(extended_rule.description, "Test rule for instructor role") + self.assertEqual(extended_rule.metadata["created_by"], "test_system") + self.assertEqual(extended_rule.metadata["priority"], 1) + self.assertEqual(extended_rule.scope, self.scope) + self.assertEqual(extended_rule.subject, self.subject) + self.assertIsNotNone(extended_rule.created_at) + self.assertIsNotNone(extended_rule.updated_at) + self.assertEqual(extended_rule.scope.course_overview, self.course_overview) + + # test scope data class details + self.assertIsInstance(self.scope_data, CourseOverviewData) + self.assertEqual(self.scope_data.course_id, str(self.course_key)) + self.assertEqual(self.scope_data.external_key, str(self.course_key)) + self.assertEqual(self.scope_data.NAMESPACE, CourseOverviewData.NAMESPACE) + self.assertTrue(self.scope_data.exists()) + self.assertEqual(self.scope_data.get_object(), self.course_overview) + self.assertTrue(CourseOverviewData.validate_external_key(self.scope_data.course_id)) + self.assertIsInstance(self.scope_data.course_key, CourseKey) + + def test_extended_casbin_rule_cascade_deletion_when_scope_deleted(self): + """Deleting a Scope should cascade to ExtendedCasbinRule and trigger the handler cleanup. + + Expected Result: + - ExtendedCasbinRule baseline row links the Scope to the CasbinRule. + - Removing the Scope deletes the ExtendedCasbinRule via database cascade. + - CasbinRule disappears because the post_delete handler mirrors the cascade. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + scope=self.scope, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = self.casbin_rule.id + scope_id = self.scope.id + + self.assertTrue(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertTrue(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertTrue(Scope.objects.filter(id=scope_id).exists()) + + self.scope.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertFalse(Scope.objects.filter(id=scope_id).exists()) + + def test_extended_casbin_rule_cascade_deletion_when_subject_deleted(self): + """Deleting a Subject should cascade to ExtendedCasbinRule and invoke the handler cleanup. + + Expected Result: + - ExtendedCasbinRule baseline row links the Subject to the CasbinRule. + - Removing the Subject deletes the ExtendedCasbinRule via database cascade. + - CasbinRule disappears because the post_delete handler mirrors the cascade. + """ + casbin_rule_key = ( + f"{self.casbin_rule.ptype},{self.casbin_rule.v0},{self.casbin_rule.v1}," + f"{self.casbin_rule.v2},{self.casbin_rule.v3}" + ) + extended_rule = ExtendedCasbinRule.objects.create( + casbin_rule_key=casbin_rule_key, + casbin_rule=self.casbin_rule, + subject=self.subject, + ) + extended_rule_id = extended_rule.id + casbin_rule_id = self.casbin_rule.id + subject_id = self.subject.id + + self.assertTrue(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertTrue(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertTrue(Subject.objects.filter(id=subject_id).exists()) + + self.subject.delete() + + self.assertFalse(ExtendedCasbinRule.objects.filter(id=extended_rule_id).exists()) + self.assertFalse(CasbinRule.objects.filter(id=casbin_rule_id).exists()) + self.assertFalse(Subject.objects.filter(id=subject_id).exists()) + + class TestPolicyCacheControlModel(TestCase): """Test cases for the PolicyCacheControl model."""