Skip to content
test_auth.py 103 KiB
Newer Older
            with mock.patch("accounts.models.User.save") as mock_save:
                user = backend._rl_update_user(user, user_attribute,
                        saml_attribute_mapping)
                self.assertEqual(mock_save.call_count, 1)
Dong Zhuang's avatar
Dong Zhuang committed

            user = backend._rl_update_user(
                    user, user_attribute, saml_attribute_mapping)
            # not set as part of _rl_update_user
            # self.assertEqual(user.first_name, expected_first)
            # self.assertEqual(user.last_name, expected_last)
            self.assertTrue(user.name_verified)
            self.assertEqual(user.status, constants.user_status.unconfirmed)
            self.assertTrue(user.institutional_id_verified)

            user_attribute = {
                'PrincipalName': (user.username,),
                'iTrustUIN': (expected_inst_id,),
                'mail': (expected_email),
                'givenName': (expected_first,),
                'sn': (expected_last,),
            }
            user = backend._rl_update_user(
                    user, user_attribute, saml_attribute_mapping)
            # not set as part of _rl_update_user
            # self.assertEqual(user.first_name, expected_first)
            # self.assertEqual(user.last_name, expected_last)
            self.assertTrue(user.name_verified)
            self.assertEqual(user.status, constants.user_status.active)
            self.assertTrue(user.institutional_id_verified)

            with mock.patch("accounts.models.User.save") as mock_save:
                # no changes
                backend._rl_update_user(user, user_attribute, saml_attribute_mapping)
                self.assertEqual(mock_save.call_count, 0)
Dong Zhuang's avatar
Dong Zhuang committed

ifaint's avatar
ifaint committed

@with_course_api_auth("Token")
def api_test_func_token(api_ctx, course_identifier):
    return JsonResponse({})


@with_course_api_auth("Token")
def api_test_func_raise_api_error(api_ctx, course_identifier):
    raise APIError()


@with_course_api_auth("Basic")
def api_test_func_basic(api_ctx, course_identifier):
    return JsonResponse({})


@with_course_api_auth("Not_allowed_method")
def api_test_func_not_allowed(api_ctx, course_identifier):
    return JsonResponse({})


urlpatterns = base_urlpatterns + [
    url(r"^course"
        "/" + COURSE_ID_REGEX
        + "/api/test_token$",
        api_test_func_token,
        name="test_api_token_method"),
    url(r"^course"
        "/" + COURSE_ID_REGEX
        + "/api/test_basic$",
        api_test_func_basic,
        name="test_api_basic_method"),
    url(r"^course"
        "/" + COURSE_ID_REGEX
        + "/api/test_not_allowed$",
        api_test_func_not_allowed,
        name="test_api_not_allowed_method"),
    url(r"^course"
        "/" + COURSE_ID_REGEX
        + "/api/test_api_error$",
        api_test_func_raise_api_error,
        name="test_api_with_api_error"),

]


@override_settings(ROOT_URLCONF=__name__)
class AuthCourseWithTokenTest(APITestMixin, TestCase):
    # test auth_course_with_token

    def setUp(self):
        self.client.force_login(self.instructor_participation.user)
ifaint's avatar
ifaint committed

    def get_test_token_url(self, course_identifier=None):
        course_identifier = (
            course_identifier or self.get_default_course_identifier())
        kwargs = {"course_identifier": course_identifier}

        return reverse("test_api_token_method", kwargs=kwargs)

    def get_test_basic_url(self, course_identifier=None):
        course_identifier = (
            course_identifier or self.get_default_course_identifier())
        kwargs = {"course_identifier": course_identifier}

        return reverse("test_api_basic_method", kwargs=kwargs)

    def get_test_not_allowed_method_url(self, course_identifier=None):
        course_identifier = (
            course_identifier or self.get_default_course_identifier())
        kwargs = {"course_identifier": course_identifier}

        return reverse("test_api_not_allowed_method", kwargs=kwargs)

    def get_test_api_error_url(self, course_identifier=None):
        course_identifier = (
            course_identifier or self.get_default_course_identifier())
        kwargs = {"course_identifier": course_identifier}

        return reverse("test_api_with_api_error", kwargs=kwargs)

    def test_no_auth_headers(self):
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_token_url())
        self.assertEqual(resp.status_code, 403)

        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_basic_url())
        self.assertEqual(resp.status_code, 401)
        self.assertEqual(resp["WWW-Authenticate"],
                         'Basic realm="Relate direct git access for test-course"')

    # {{{ method = "Token"

    def test_invalid_token_case_not_matched(self):
        token = self.create_token()
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_token_url(),

            # case not matched
            HTTP_AUTHORIZATION="token %i_%s" % (
                token.id, self.default_token_hash_str))
        self.assertEqual(resp.status_code, 403)

    def test_invalid_token_no_space_in_auth_str(self):
        token = self.create_token()
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_token_url(),

            # no space between 'Token' and auth_data
            HTTP_AUTHORIZATION="Token%i_%s" % (
                token.id, self.default_token_hash_str))
        self.assertEqual(resp.status_code, 403)

    def test_invalid_token_wrong_format(self):
        # underscores are not allowed
        token = self.create_token(token_hash_str="an_invalid_token")
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_token_url(),
            HTTP_AUTHORIZATION="Token %i_%s" % (
                token.id, self.default_token_hash_str))
        self.assertEqual(resp.status_code, 403)

    def test_none_exist_token(self):
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_token_url(),
            HTTP_AUTHORIZATION="Token %i_%s" % (
                1, "nonexisttokenstr"))
        self.assertEqual(resp.status_code, 403)

    def test_revoked_token(self):
        token = self.create_token(
            revocation_time=now() - timedelta(minutes=1))
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_token_url(),
            HTTP_AUTHORIZATION="Token %i_%s" % (
                token.id, self.default_token_hash_str))
        self.assertEqual(resp.status_code, 403)

    def test_expired_token(self):
        token = self.create_token(
            valid_until=now() - timedelta(minutes=1))
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_token_url(),
            HTTP_AUTHORIZATION="Token %i_%s" % (
                token.id, self.default_token_hash_str))
        self.assertEqual(resp.status_code, 403)

    def test_token_auth_success(self):
        token = self.create_token()
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_token_url(),
            HTTP_AUTHORIZATION="Token %i_%s" % (
                token.id, self.default_token_hash_str))
        self.assertEqual(resp.status_code, 200)

    # }}}

    # {{{ method = "Basic"

    def test_basic_auth_success(self):
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_basic_url(),
            HTTP_AUTHORIZATION="Basic %s" % self.create_basic_auth())
        self.assertEqual(resp.status_code, 200)

    def test_basic_auth_ill_formed(self):
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_basic_url(),
            HTTP_AUTHORIZATION="Basic %s" % "foo:barbar")
        self.assertEqual(resp.status_code, 401)

    def test_basic_auth_no_match(self):
        from base64 import b64encode
        bad_auth_data = b64encode(b"foobar").decode()
ifaint's avatar
ifaint committed

        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_basic_url(),
            HTTP_AUTHORIZATION="Basic %s" % bad_auth_data)
        self.assertEqual(resp.status_code, 401)

    def test_basic_auth_user_not_matched(self):
        basic_auth_user_not_matched = self.create_basic_auth(
            participation=self.instructor_participation,
            user=self.ta_participation.user
        )

        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_basic_url(),
            HTTP_AUTHORIZATION="Basic %s" % basic_auth_user_not_matched)
        self.assertEqual(resp.status_code, 401)

    # }}}

    # {{{ method not allowed

    def test_auth_method_not_allowed(self):
        with self.assertRaises(AssertionError):
ifaint's avatar
ifaint committed
                self.get_test_not_allowed_method_url(),
                HTTP_AUTHORIZATION="Not_allowed_method blabla")

    def test_auth_method_not_allowed_method_not_matched(self):
        token = self.create_token()
        with self.assertRaises(AssertionError):
ifaint's avatar
ifaint committed
                self.get_test_not_allowed_method_url(),
                HTTP_AUTHORIZATION="Token %i_%s" % (
                    token.id, self.default_token_hash_str))

    # }}}

    def test_raise_api_error(self):
        token = self.create_token()
        resp = self.client.get(
ifaint's avatar
ifaint committed
            self.get_test_api_error_url(),
            HTTP_AUTHORIZATION="Token %i_%s" % (
                token.id, self.default_token_hash_str))
        self.assertEqual(resp.status_code, 400)


class ManageAuthenticationTokensTest(
        SingleCoursePageTestMixin, MockAddMessageMixing, TestCase):
    # test manage_authentication_tokens

    def setUp(self):
        self.client.force_login(self.instructor_participation.user)
ifaint's avatar
ifaint committed

    def test_not_authenticated(self):
        with self.temporarily_switch_to_user(None):
            resp = self.client.get(self.get_manage_authentication_token_url())
ifaint's avatar
ifaint committed
        self.assertEqual(resp.status_code, 403)

    def test_no_permission_authenticated(self):
        with self.temporarily_switch_to_user(self.student_participation.user):
            resp = self.client.get(self.get_manage_authentication_token_url())
ifaint's avatar
ifaint committed
        self.assertEqual(resp.status_code, 403)

    def test_get_success(self):
        resp = self.client.get(self.get_manage_authentication_token_url())
ifaint's avatar
ifaint committed
        self.assertEqual(resp.status_code, 200)

        tokens = self.get_response_context_value_by_name(resp, "tokens")
        self.assertEqual(tokens.count(), 0)

    def get_manage_authentication_tokens_post_data(
            self, restrict_to_participation_role=None,
            description="test", valid_until=None,
            create=True, revoke_id=None, **kwargs):

        data = {}
        if create:
            assert revoke_id is None
            if restrict_to_participation_role is None:
                prole_kwargs = {
                    "identifier": "instructor",
                    "course": self.course
                }
                role = factories.ParticipationRoleFactory(**prole_kwargs)
                restrict_to_participation_role = role.pk

            if not valid_until:
                valid_until = (now() + timedelta(weeks=2)
                               ).replace(tzinfo=None).strftime("%Y-%m-%d")

            data.update({
                "restrict_to_participation_role": restrict_to_participation_role,
                "valid_until": valid_until,
                "description": description,
                "create": ""
            })

        if revoke_id:
            assert isinstance(revoke_id, int)
            data["revoke_%i" % revoke_id] = ""

        data.update(kwargs)
        return data

    def test_get_tokens_with_revocation_time_within_a_week(self):
        factories.AuthenticationTokenFactory.create_batch(
            size=1,
            user=self.ta_participation.user,
            participation=self.ta_participation,
        )

        factories.AuthenticationTokenFactory.create_batch(
            size=5,
            user=self.instructor_participation.user,
            participation=self.instructor_participation,
        )

        factories.AuthenticationTokenFactory.create_batch(
            size=3, revocation_time=now() - timedelta(weeks=2),
            user=self.instructor_participation.user,
            participation=self.instructor_participation,
        )
        factories.AuthenticationTokenFactory.create_batch(
            size=2, revocation_time=now() - timedelta(days=2),
            user=self.instructor_participation.user,
            participation=self.instructor_participation,
        )

        resp = self.client.get(self.get_manage_authentication_token_url())
ifaint's avatar
ifaint committed
        self.assertEqual(resp.status_code, 200)

        tokens = self.get_response_context_value_by_name(resp, "tokens")
        self.assertEqual(tokens.count(), 7)

    def test_post_create_success(self):
        n_exist_tokens = 3

        factories.AuthenticationTokenFactory.create_batch(
            size=n_exist_tokens,
            user=self.instructor_participation.user,
            participation=self.instructor_participation,
        )

        resp = self.client.post(
ifaint's avatar
ifaint committed
            self.get_manage_authentication_token_url(),
            data=self.get_manage_authentication_tokens_post_data()
        )
        self.assertEqual(resp.status_code, 200)

        tokens = self.get_response_context_value_by_name(resp, "tokens")
        self.assertEqual(tokens.count(), n_exist_tokens + 1)

        added_token = AuthenticationToken.objects.last()
        added_message = self._get_added_messages()

        match = _TOKEN_AUTH_DATA_RE.match(added_message)
        self.assertIsNotNone(match)

        token_id = int(match.group("token_id"))
        self.assertEqual(added_token.id, token_id)

        token_hash_str = match.group("token_hash")
        self.assertTrue(check_password(token_hash_str, added_token.token_hash))

    def test_post_create_form_invalid(self):
        resp = self.client.post(
ifaint's avatar
ifaint committed
            self.get_manage_authentication_token_url(),
            data=self.get_manage_authentication_tokens_post_data(
                description=""
            )
        )
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(AuthenticationToken.objects.count(), 0)

        self.assertFormError(resp, "form", "description", "This field is required.")

    def test_post_revoke(self):
        n_exist_tokens = 3

        tokens = factories.AuthenticationTokenFactory.create_batch(
            size=n_exist_tokens,
            user=self.instructor_participation.user,
            participation=self.instructor_participation,
        )

        resp = self.client.post(
ifaint's avatar
ifaint committed
            self.get_manage_authentication_token_url(),
            data=self.get_manage_authentication_tokens_post_data(
                create=False,
                revoke_id=tokens[0].id)
        )
        self.assertEqual(resp.status_code, 200)

        active_tokens = AuthenticationToken.objects.filter(
            revocation_time__isnull=True)
        self.assertEqual(active_tokens.count(), n_exist_tokens - 1)

    def test_post_create_unknown_button_pressed(self):
        resp = self.client.post(
ifaint's avatar
ifaint committed
            self.get_manage_authentication_token_url(),
            data=self.get_manage_authentication_tokens_post_data(
                create=False
            )
        )
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(AuthenticationToken.objects.count(), 0)
        self.assertAddMessageCallCount(1)


class APIBearerTokenBackendTest(APITestMixin, TestCase):
    # test APIBearerTokenBackend

    def test_authenticate_success(self):
        token = self.create_token()
ifaint's avatar
ifaint committed

        backend = APIBearerTokenBackend()
        self.assertEqual(
            backend.authenticate(
                None, self.course.identifier, token.id, self.default_token_hash_str),
            self.instructor_participation.user
        )

    def test_authenticate_fail_no_matching_token(self):
ifaint's avatar
ifaint committed

        backend = APIBearerTokenBackend()
        self.assertIsNone(
            backend.authenticate(
                None, self.course.identifier, 1, "foobar"))

    def test_get_user(self):
ifaint's avatar
ifaint committed

        backend = APIBearerTokenBackend()
        self.assertEqual(
            backend.get_user(
                self.instructor_participation.user.id),
            self.instructor_participation.user)
        self.assertIsNone(
            backend.get_user(10000))


class APIContextTest(APITestMixin, TestCase):
    # test APIContext

    def test_restrict_to_role_is_not_none(self):
        token = self.create_token()
        api_context = APIContext(None, token)
        self.assertIsNotNone(api_context.restrict_to_role)

    def test_restrict_to_role_is_none(self):
        token = self.create_token()
        token.restrict_to_participation_role = None
        token.save()

        api_context = APIContext(None, token)
        self.assertIsNone(api_context.restrict_to_role)

    def test_restrict_to_role_not_in_participation_roles(self):
        token = self.create_token(participation=self.student_participation)

        prole_kwargs = {
            "course": self.course, "identifier": "ta"}
        role = factories.ParticipationRoleFactory(**prole_kwargs)
        token.restrict_to_participation_role = role
        token.save()

        with self.assertRaises(PermissionDenied):
            APIContext(None, token)

    def test_restrict_to_role_not_in_participation_roles_but_may_impersonate(self):
        token = self.create_token(participation=self.ta_participation)

        prole_kwargs = {
            "course": self.course, "identifier": "student"}
        role = factories.ParticipationRoleFactory(**prole_kwargs)
        token.restrict_to_participation_role = role
        token.save()

        api_context = APIContext(None, token)
        self.assertIsNotNone(api_context.restrict_to_role)

    def test_api_context_has_permission_true(self):
        token = self.create_token()
        api_context = APIContext(None, token)

        from course.constants import participation_permission as pperm
        self.assertTrue(api_context.has_permission(
            pperm.access_files_for, "instructor"))

    def test_api_context_has_permission_restrict_to_role_is_none_true(self):
        token = self.create_token()
        token.restrict_to_participation_role = None
        token.save()

        api_context = APIContext(None, token)

        from course.constants import participation_permission as pperm
        self.assertTrue(api_context.has_permission(
            pperm.access_files_for, "instructor"))

    def test_api_context_has_permission_false(self):
        token = self.create_token(participation=self.ta_participation)
        api_context = APIContext(None, token)

        from course.constants import participation_permission as pperm
        self.assertFalse(api_context.has_permission(
            pperm.access_files_for, "instructor"))

    def test_api_context_has_permission_restrict_to_role_is_none_false(self):
        token = self.create_token(participation=self.ta_participation)
        token.restrict_to_participation_role = None
        token.save()

        api_context = APIContext(None, token)

        from course.constants import participation_permission as pperm
        self.assertFalse(api_context.has_permission(
            pperm.access_files_for, "instructor"))


Dong Zhuang's avatar
Dong Zhuang committed
# vim: foldmethod=marker