Newer
Older
with mock.patch("accounts.models.User.save") as mock_save:
user = backend._rl_update_user(user, user_attribute,
saml_attribute_mapping)
self.assertEqual(mock_save.call_count, 1)
user = backend._rl_update_user(
user, user_attribute, saml_attribute_mapping)
# not set as part of _rl_update_user
# self.assertEqual(user.first_name, expected_first)
# self.assertEqual(user.last_name, expected_last)
self.assertTrue(user.name_verified)
self.assertEqual(user.status, constants.user_status.unconfirmed)
self.assertTrue(user.institutional_id_verified)
user_attribute = {
'PrincipalName': (user.username,),
'iTrustUIN': (expected_inst_id,),
'mail': (expected_email),
'givenName': (expected_first,),
'sn': (expected_last,),
}
user = backend._rl_update_user(
user, user_attribute, saml_attribute_mapping)
# not set as part of _rl_update_user
# self.assertEqual(user.first_name, expected_first)
# self.assertEqual(user.last_name, expected_last)
self.assertTrue(user.name_verified)
self.assertEqual(user.status, constants.user_status.active)
self.assertTrue(user.institutional_id_verified)
with mock.patch("accounts.models.User.save") as mock_save:
# no changes
backend._rl_update_user(user, user_attribute, saml_attribute_mapping)
self.assertEqual(mock_save.call_count, 0)
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
@with_course_api_auth("Token")
def api_test_func_token(api_ctx, course_identifier):
return JsonResponse({})
@with_course_api_auth("Token")
def api_test_func_raise_api_error(api_ctx, course_identifier):
raise APIError()
@with_course_api_auth("Basic")
def api_test_func_basic(api_ctx, course_identifier):
return JsonResponse({})
@with_course_api_auth("Not_allowed_method")
def api_test_func_not_allowed(api_ctx, course_identifier):
return JsonResponse({})
urlpatterns = base_urlpatterns + [
url(r"^course"
"/" + COURSE_ID_REGEX
+ "/api/test_token$",
api_test_func_token,
name="test_api_token_method"),
url(r"^course"
"/" + COURSE_ID_REGEX
+ "/api/test_basic$",
api_test_func_basic,
name="test_api_basic_method"),
url(r"^course"
"/" + COURSE_ID_REGEX
+ "/api/test_not_allowed$",
api_test_func_not_allowed,
name="test_api_not_allowed_method"),
url(r"^course"
"/" + COURSE_ID_REGEX
+ "/api/test_api_error$",
api_test_func_raise_api_error,
name="test_api_with_api_error"),
]
@override_settings(ROOT_URLCONF=__name__)
class AuthCourseWithTokenTest(APITestMixin, TestCase):
# test auth_course_with_token
def setUp(self):
self.client.force_login(self.instructor_participation.user)
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
def get_test_token_url(self, course_identifier=None):
course_identifier = (
course_identifier or self.get_default_course_identifier())
kwargs = {"course_identifier": course_identifier}
return reverse("test_api_token_method", kwargs=kwargs)
def get_test_basic_url(self, course_identifier=None):
course_identifier = (
course_identifier or self.get_default_course_identifier())
kwargs = {"course_identifier": course_identifier}
return reverse("test_api_basic_method", kwargs=kwargs)
def get_test_not_allowed_method_url(self, course_identifier=None):
course_identifier = (
course_identifier or self.get_default_course_identifier())
kwargs = {"course_identifier": course_identifier}
return reverse("test_api_not_allowed_method", kwargs=kwargs)
def get_test_api_error_url(self, course_identifier=None):
course_identifier = (
course_identifier or self.get_default_course_identifier())
kwargs = {"course_identifier": course_identifier}
return reverse("test_api_with_api_error", kwargs=kwargs)
def test_no_auth_headers(self):
resp = self.client.get(
self.get_test_token_url())
self.assertEqual(resp.status_code, 403)
resp = self.client.get(
self.get_test_basic_url())
self.assertEqual(resp.status_code, 401)
self.assertEqual(resp["WWW-Authenticate"],
'Basic realm="Relate direct git access for test-course"')
# {{{ method = "Token"
def test_invalid_token_case_not_matched(self):
token = self.create_token()
resp = self.client.get(
self.get_test_token_url(),
# case not matched
HTTP_AUTHORIZATION="token %i_%s" % (
token.id, self.default_token_hash_str))
self.assertEqual(resp.status_code, 403)
def test_invalid_token_no_space_in_auth_str(self):
token = self.create_token()
resp = self.client.get(
self.get_test_token_url(),
# no space between 'Token' and auth_data
HTTP_AUTHORIZATION="Token%i_%s" % (
token.id, self.default_token_hash_str))
self.assertEqual(resp.status_code, 403)
def test_invalid_token_wrong_format(self):
# underscores are not allowed
token = self.create_token(token_hash_str="an_invalid_token")
resp = self.client.get(
self.get_test_token_url(),
HTTP_AUTHORIZATION="Token %i_%s" % (
token.id, self.default_token_hash_str))
self.assertEqual(resp.status_code, 403)
def test_none_exist_token(self):
resp = self.client.get(
self.get_test_token_url(),
HTTP_AUTHORIZATION="Token %i_%s" % (
1, "nonexisttokenstr"))
self.assertEqual(resp.status_code, 403)
def test_revoked_token(self):
token = self.create_token(
revocation_time=now() - timedelta(minutes=1))
resp = self.client.get(
self.get_test_token_url(),
HTTP_AUTHORIZATION="Token %i_%s" % (
token.id, self.default_token_hash_str))
self.assertEqual(resp.status_code, 403)
def test_expired_token(self):
token = self.create_token(
valid_until=now() - timedelta(minutes=1))
resp = self.client.get(
self.get_test_token_url(),
HTTP_AUTHORIZATION="Token %i_%s" % (
token.id, self.default_token_hash_str))
self.assertEqual(resp.status_code, 403)
def test_token_auth_success(self):
token = self.create_token()
resp = self.client.get(
self.get_test_token_url(),
HTTP_AUTHORIZATION="Token %i_%s" % (
token.id, self.default_token_hash_str))
self.assertEqual(resp.status_code, 200)
# }}}
# {{{ method = "Basic"
def test_basic_auth_success(self):
resp = self.client.get(
self.get_test_basic_url(),
HTTP_AUTHORIZATION="Basic %s" % self.create_basic_auth())
self.assertEqual(resp.status_code, 200)
def test_basic_auth_ill_formed(self):
resp = self.client.get(
self.get_test_basic_url(),
HTTP_AUTHORIZATION="Basic %s" % "foo:barbar")
self.assertEqual(resp.status_code, 401)
def test_basic_auth_no_match(self):
from base64 import b64encode
bad_auth_data = b64encode(b"foobar").decode()
resp = self.client.get(
self.get_test_basic_url(),
HTTP_AUTHORIZATION="Basic %s" % bad_auth_data)
self.assertEqual(resp.status_code, 401)
def test_basic_auth_user_not_matched(self):
basic_auth_user_not_matched = self.create_basic_auth(
participation=self.instructor_participation,
user=self.ta_participation.user
)
resp = self.client.get(
self.get_test_basic_url(),
HTTP_AUTHORIZATION="Basic %s" % basic_auth_user_not_matched)
self.assertEqual(resp.status_code, 401)
# }}}
# {{{ method not allowed
def test_auth_method_not_allowed(self):
with self.assertRaises(AssertionError):
self.get_test_not_allowed_method_url(),
HTTP_AUTHORIZATION="Not_allowed_method blabla")
def test_auth_method_not_allowed_method_not_matched(self):
token = self.create_token()
with self.assertRaises(AssertionError):
self.get_test_not_allowed_method_url(),
HTTP_AUTHORIZATION="Token %i_%s" % (
token.id, self.default_token_hash_str))
# }}}
def test_raise_api_error(self):
token = self.create_token()
resp = self.client.get(
self.get_test_api_error_url(),
HTTP_AUTHORIZATION="Token %i_%s" % (
token.id, self.default_token_hash_str))
self.assertEqual(resp.status_code, 400)
class ManageAuthenticationTokensTest(
SingleCoursePageTestMixin, MockAddMessageMixing, TestCase):
# test manage_authentication_tokens
def setUp(self):
self.client.force_login(self.instructor_participation.user)
def test_not_authenticated(self):
with self.temporarily_switch_to_user(None):
resp = self.client.get(self.get_manage_authentication_token_url())
self.assertEqual(resp.status_code, 403)
def test_no_permission_authenticated(self):
with self.temporarily_switch_to_user(self.student_participation.user):
resp = self.client.get(self.get_manage_authentication_token_url())
self.assertEqual(resp.status_code, 403)
def test_get_success(self):
resp = self.client.get(self.get_manage_authentication_token_url())
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
self.assertEqual(resp.status_code, 200)
tokens = self.get_response_context_value_by_name(resp, "tokens")
self.assertEqual(tokens.count(), 0)
def get_manage_authentication_tokens_post_data(
self, restrict_to_participation_role=None,
description="test", valid_until=None,
create=True, revoke_id=None, **kwargs):
data = {}
if create:
assert revoke_id is None
if restrict_to_participation_role is None:
prole_kwargs = {
"identifier": "instructor",
"course": self.course
}
role = factories.ParticipationRoleFactory(**prole_kwargs)
restrict_to_participation_role = role.pk
if not valid_until:
valid_until = (now() + timedelta(weeks=2)
).replace(tzinfo=None).strftime("%Y-%m-%d")
data.update({
"restrict_to_participation_role": restrict_to_participation_role,
"valid_until": valid_until,
"description": description,
"create": ""
})
if revoke_id:
assert isinstance(revoke_id, int)
data["revoke_%i" % revoke_id] = ""
data.update(kwargs)
return data
def test_get_tokens_with_revocation_time_within_a_week(self):
factories.AuthenticationTokenFactory.create_batch(
size=1,
user=self.ta_participation.user,
participation=self.ta_participation,
)
factories.AuthenticationTokenFactory.create_batch(
size=5,
user=self.instructor_participation.user,
participation=self.instructor_participation,
)
factories.AuthenticationTokenFactory.create_batch(
size=3, revocation_time=now() - timedelta(weeks=2),
user=self.instructor_participation.user,
participation=self.instructor_participation,
)
factories.AuthenticationTokenFactory.create_batch(
size=2, revocation_time=now() - timedelta(days=2),
user=self.instructor_participation.user,
participation=self.instructor_participation,
)
resp = self.client.get(self.get_manage_authentication_token_url())
self.assertEqual(resp.status_code, 200)
tokens = self.get_response_context_value_by_name(resp, "tokens")
self.assertEqual(tokens.count(), 7)
def test_post_create_success(self):
n_exist_tokens = 3
factories.AuthenticationTokenFactory.create_batch(
size=n_exist_tokens,
user=self.instructor_participation.user,
participation=self.instructor_participation,
)
resp = self.client.post(
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
self.get_manage_authentication_token_url(),
data=self.get_manage_authentication_tokens_post_data()
)
self.assertEqual(resp.status_code, 200)
tokens = self.get_response_context_value_by_name(resp, "tokens")
self.assertEqual(tokens.count(), n_exist_tokens + 1)
added_token = AuthenticationToken.objects.last()
added_message = self._get_added_messages()
match = _TOKEN_AUTH_DATA_RE.match(added_message)
self.assertIsNotNone(match)
token_id = int(match.group("token_id"))
self.assertEqual(added_token.id, token_id)
token_hash_str = match.group("token_hash")
self.assertTrue(check_password(token_hash_str, added_token.token_hash))
def test_post_create_form_invalid(self):
resp = self.client.post(
self.get_manage_authentication_token_url(),
data=self.get_manage_authentication_tokens_post_data(
description=""
)
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(AuthenticationToken.objects.count(), 0)
self.assertFormError(resp, "form", "description", "This field is required.")
def test_post_revoke(self):
n_exist_tokens = 3
tokens = factories.AuthenticationTokenFactory.create_batch(
size=n_exist_tokens,
user=self.instructor_participation.user,
participation=self.instructor_participation,
)
resp = self.client.post(
self.get_manage_authentication_token_url(),
data=self.get_manage_authentication_tokens_post_data(
create=False,
revoke_id=tokens[0].id)
)
self.assertEqual(resp.status_code, 200)
active_tokens = AuthenticationToken.objects.filter(
revocation_time__isnull=True)
self.assertEqual(active_tokens.count(), n_exist_tokens - 1)
def test_post_create_unknown_button_pressed(self):
resp = self.client.post(
self.get_manage_authentication_token_url(),
data=self.get_manage_authentication_tokens_post_data(
create=False
)
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(AuthenticationToken.objects.count(), 0)
self.assertAddMessageCallCount(1)
class APIBearerTokenBackendTest(APITestMixin, TestCase):
# test APIBearerTokenBackend
def test_authenticate_success(self):
token = self.create_token()
backend = APIBearerTokenBackend()
self.assertEqual(
backend.authenticate(
None, self.course.identifier, token.id, self.default_token_hash_str),
self.instructor_participation.user
)
def test_authenticate_fail_no_matching_token(self):
backend = APIBearerTokenBackend()
self.assertIsNone(
backend.authenticate(
None, self.course.identifier, 1, "foobar"))
def test_get_user(self):
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
backend = APIBearerTokenBackend()
self.assertEqual(
backend.get_user(
self.instructor_participation.user.id),
self.instructor_participation.user)
self.assertIsNone(
backend.get_user(10000))
class APIContextTest(APITestMixin, TestCase):
# test APIContext
def test_restrict_to_role_is_not_none(self):
token = self.create_token()
api_context = APIContext(None, token)
self.assertIsNotNone(api_context.restrict_to_role)
def test_restrict_to_role_is_none(self):
token = self.create_token()
token.restrict_to_participation_role = None
token.save()
api_context = APIContext(None, token)
self.assertIsNone(api_context.restrict_to_role)
def test_restrict_to_role_not_in_participation_roles(self):
token = self.create_token(participation=self.student_participation)
prole_kwargs = {
"course": self.course, "identifier": "ta"}
role = factories.ParticipationRoleFactory(**prole_kwargs)
token.restrict_to_participation_role = role
token.save()
with self.assertRaises(PermissionDenied):
APIContext(None, token)
def test_restrict_to_role_not_in_participation_roles_but_may_impersonate(self):
token = self.create_token(participation=self.ta_participation)
prole_kwargs = {
"course": self.course, "identifier": "student"}
role = factories.ParticipationRoleFactory(**prole_kwargs)
token.restrict_to_participation_role = role
token.save()
api_context = APIContext(None, token)
self.assertIsNotNone(api_context.restrict_to_role)
def test_api_context_has_permission_true(self):
token = self.create_token()
api_context = APIContext(None, token)
from course.constants import participation_permission as pperm
self.assertTrue(api_context.has_permission(
pperm.access_files_for, "instructor"))
def test_api_context_has_permission_restrict_to_role_is_none_true(self):
token = self.create_token()
token.restrict_to_participation_role = None
token.save()
api_context = APIContext(None, token)
from course.constants import participation_permission as pperm
self.assertTrue(api_context.has_permission(
pperm.access_files_for, "instructor"))
def test_api_context_has_permission_false(self):
token = self.create_token(participation=self.ta_participation)
api_context = APIContext(None, token)
from course.constants import participation_permission as pperm
self.assertFalse(api_context.has_permission(
pperm.access_files_for, "instructor"))
def test_api_context_has_permission_restrict_to_role_is_none_false(self):
token = self.create_token(participation=self.ta_participation)
token.restrict_to_participation_role = None
token.save()
api_context = APIContext(None, token)
from course.constants import participation_permission as pperm
self.assertFalse(api_context.has_permission(
pperm.access_files_for, "instructor"))