Newer
Older
from __future__ import division
__copyright__ = "Copyright (C) 2017 Dong Zhuang, Andreas Kloeckner, Zesheng Wang"
__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import re
import shutil
import hashlib
import memcache
from django.test import Client, override_settings
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
Course, Participation, ParticipationRole, FlowSession, FlowPageData,
from course.constants import (
participation_status, user_status,
grade_aggregation_strategy as g_strategy,
flow_permission as fperm)
from course.content import get_course_repo_path
from tests.constants import QUIZ_FLOW_ID, TEST_PAGE_TUPLE
from tests.utils import mock
CREATE_SUPERUSER_KWARGS = {
"username": "test_admin",
"password": "test_admin",
"email": "test_admin@example.com",
"first_name": "Test",
"last_name": "Admin"}
SINGLE_COURSE_SETUP_LIST = [
{
"course": {
"identifier": "test-course",
"name": "Test Course",
"number": "CS123",
"time_period": "Fall 2016",
"hidden": False,
"listed": True,
"accepts_enrollment": True,
"git_source": "git://github.com/inducer/relate-sample",
"course_file": "course.yml",
"events_file": "events.yml",
"enrollment_approval_required": False,
"enrollment_required_email_suffix": "",
"preapproval_require_verified_inst_id": True,
"from_email": "inform@tiker.net",
"notify_email": "inform@tiker.net"},
"participations": [
{
"role_identifier": "instructor",
"user": {
"username": "test_instructor",
"password": "test_instructor",
"email": "test_instructor@example.com",
"last_name": "Instructor"},
"status": participation_status.active
},
{
"role_identifier": "ta",
"user": {
"username": "test_ta",
"password": "test",
"email": "test_ta@example.com",
"last_name": "TA"},
"status": participation_status.active
},
{
"role_identifier": "student",
"user": {
"username": "test_student",
"password": "test",
"email": "test_student@example.com",
"last_name": "Student"},
"status": participation_status.active
}
],
}
]
TWO_COURSE_SETUP_LIST = deepcopy(SINGLE_COURSE_SETUP_LIST)
TWO_COURSE_SETUP_LIST[0]["course"]["identifier"] = "test-course1"
TWO_COURSE_SETUP_LIST += deepcopy(SINGLE_COURSE_SETUP_LIST)
TWO_COURSE_SETUP_LIST[1]["course"]["identifier"] = "test-course2"
NONE_PARTICIPATION_USER_CREATE_KWARG_LIST = [
{
"username": "test_user1",
"password": "test_user1",
"email": "test_user1@suffix.com",
"first_name": "Test",
"last_name": "User1",
"institutional_id": "test_user1_institutional_id",
"institutional_id_verified": True,
"status": user_status.active
},
{
"username": "test_user2",
"password": "test_user2",
"email": "test_user2@nosuffix.com",
"first_name": "Test",
"last_name": "User2",
"institutional_id": "test_user2_institutional_id",
"institutional_id_verified": False,
"status": user_status.active
},
{
"username": "test_user3",
"password": "test_user3",
"email": "test_user3@suffix.com",
"first_name": "Test",
"last_name": "User3",
"institutional_id": "test_user3_institutional_id",
"institutional_id_verified": True,
"status": user_status.unconfirmed
},
{
"username": "test_user4",
"password": "test_user4",
"email": "test_user4@no_suffix.com",
"first_name": "Test",
"last_name": "User4",
"institutional_id": "test_user4_institutional_id",
"institutional_id_verified": False,
"status": user_status.unconfirmed
try:
mc = memcache.Client(['127.0.0.1:11211'])
except Exception:
pass
SELECT2_HTML_FIELD_ID_SEARCH_PATTERN = re.compile(r'data-field_id="([^"]+)"')
def git_source_url_to_cache_keys(url):
url_hash = hashlib.md5(url.encode("utf-8")).hexdigest()
return (
"test_course:%s" % url_hash,
"test_sha:%s" % url_hash
)
class CourseCreateFailure(Exception):
pass
class ResponseContextMixin(object):
"""
Response context refers to "the template Context instance that was used
to render the template that produced the response content".
Ref: https://docs.djangoproject.com/en/dev/topics/testing/tools/#django.test.Response.context # noqa
"""
def get_response_context_value_by_name(self, response, context_name):
try:
value = response.context[context_name]
except KeyError:
self.fail("%s does not exist in given response" % context_name)
else:
return value
def assertResponseHasNoContext(self, response, context_name): # noqa
has_context = True
try:
response.context[context_name]
except KeyError:
has_context = False
if has_context:
self.fail("%s unexpectedly exist in given response" % context_name)
def assertResponseContextIsNone(self, resp, context_name): # noqa
try:
value = self.get_response_context_value_by_name(resp, context_name)
except AssertionError:
# the context item doesn't exist
pass
else:
self.assertIsNone(value)
def assertResponseContextIsNotNone(self, resp, context_name, msg=""): # noqa
value = self.get_response_context_value_by_name(resp, context_name)
self.assertIsNotNone(value, msg)
def assertResponseContextEqual(self, resp, context_name, expected_value): # noqa
value = self.get_response_context_value_by_name(resp, context_name)
try:
self.assertTrue(float(value) - float(expected_value) <= 1e-04)
return
except Exception:
self.assertEqual(value, expected_value)
context_name, expected_value, html=False,
in_bulk=False):
value = self.get_response_context_value_by_name(resp, context_name)
if in_bulk:
if not isinstance(expected_value, list):
expected_value = [expected_value]
for v in expected_value:
if not html:
self.assertIn(v, value)
else:
self.assertInHTML(v, value)
if not html:
self.assertIn(expected_value, value)
else:
self.assertInHTML(expected_value, value)
def assertResponseContextRegex( # noqa
self, resp, # noqa
context_name, expected_value_regex):
value = self.get_response_context_value_by_name(resp, context_name)
six.assertRegex(self, value, expected_value_regex)
def get_response_context_answer_feedback(self, response):
return self.get_response_context_value_by_name(response, "feedback")
def get_response_context_answer_feedback_string(self, response,
include_bulk_feedback=True):
answer_feedback = self.get_response_context_value_by_name(
response, "feedback")
self.assertTrue(hasattr(answer_feedback, "feedback"))
if not include_bulk_feedback:
return answer_feedback.feedback
if answer_feedback.bulk_feedback is None:
return answer_feedback.feedback
else:
if answer_feedback.feedback is None:
return answer_feedback.bulk_feedback
return answer_feedback.feedback + answer_feedback.bulk_feedback
def assertResponseContextAnswerFeedbackContainsFeedback( # noqa
include_bulk_feedback=True, html=False):
feedback_str = self.get_response_context_answer_feedback_string(
response, include_bulk_feedback)
if not html:
self.assertIn(expected_feedback, feedback_str)
else:
self.assertInHTML(expected_feedback, feedback_str)
def assertResponseContextAnswerFeedbackNotContainsFeedback( # noqa
self, response, expected_feedback,
include_bulk_feedback=True,
html=False):
feedback_str = self.get_response_context_answer_feedback_string(
response, include_bulk_feedback)
self.assertNotIn(expected_feedback, feedback_str)
self.assertInHTML(expected_feedback, feedback_str, count=0)
def assertResponseContextAnswerFeedbackCorrectnessEquals( # noqa
self, response, expected_correctness):
answer_feedback = self.get_response_context_answer_feedback(response)
if expected_correctness is None:
try:
self.assertTrue(hasattr(answer_feedback, "correctness"))
except AssertionError:
pass
else:
self.assertIsNone(answer_feedback.correctness)
else:
if answer_feedback.correctness is None:
return self.fail("The returned correctness is None, not %s"
% expected_correctness)
self.assertTrue(
abs(float(answer_feedback.correctness)
"%s does not equal %s"
% (str(answer_feedback.correctness)[:5],
str(expected_correctness)[:5]))
def get_response_body(self, response):
return self.get_response_context_value_by_name(response, "body")
def get_page_response_correct_answer(self, response):
return self.get_response_context_value_by_name(response, "correct_answer")
def get_page_response_feedback(self, response):
return self.get_response_context_value_by_name(response, "feedback")
def debug_print_response_context_value(self, resp, context_name):
try:
value = self.get_response_context_value_by_name(resp, context_name)
print("\n-----------context %s-------------"
% context_name)
if isinstance(value, (list, tuple)):
from course.validation import ValidationWarning
for v in value:
if isinstance(v, ValidationWarning):
print(v.text)
else:
print(repr(v))
else:
print(value)
print("-----------context end-------------\n")
except AssertionError:
print("\n-------no value for context %s----------" % context_name)
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def get_select2_field_id_from_response(self, response,
form_context_name="form"):
self.assertResponseContextIsNotNone(
response, form_context_name,
"The response doesn't contain a context named '%s'"
% form_context_name)
form_str = str(response.context[form_context_name])
m = SELECT2_HTML_FIELD_ID_SEARCH_PATTERN.search(form_str)
assert m, "pattern not found in %s" % form_str
return m.group(1)
def select2_get_request(self, field_id, term=None,
select2_urlname='django_select2-json'):
select2_url = reverse(select2_urlname)
params = {"field_id": field_id}
if term is not None:
assert isinstance(term, six.string_types)
term = term.strip()
if term:
params["term"] = term
return self.c.get(select2_url, params,
HTTP_X_REQUESTED_WITH='XMLHttpRequest')
def get_select2_response_data(self, response, key="results"):
import json
return json.loads(response.content.decode('utf-8'))[key]
create_superuser_kwargs = CREATE_SUPERUSER_KWARGS
@classmethod
def setUpTestData(cls): # noqa
# Create superuser, without this, we cannot
# create user, course and participation.
cls.superuser = cls.create_superuser()
cls.c = Client()
cls.settings_git_root_override = (
override_settings(GIT_ROOT=tempfile.mkdtemp()))
cls.settings_git_root_override.enable()
super(SuperuserCreateMixin, cls).setUpTestData()
@classmethod
def create_superuser(cls):
return get_user_model().objects.create_superuser(
**cls.create_superuser_kwargs)
@classmethod
def get_sign_up_view_url(cls):
return reverse("relate-sign_up")
@classmethod
def get_sign_up(cls, follow=True):
return cls.c.get(cls.get_sign_up_view_url(), follow=follow)
@classmethod
def post_sign_up(cls, data, follow=True):
return cls.c.post(cls.get_sign_up_view_url(), data, follow=follow)
@classmethod
def get_profile_view_url(cls):
return reverse("relate-user_profile")
@classmethod
def get_profile(cls, follow=True):
return cls.c.get(cls.get_profile_view_url(), follow=follow)
@classmethod
def post_profile(cls, data, follow=True):
return cls.c.post(cls.get_profile_view_url(), data, follow=follow)
@classmethod
def post_signout(cls, data, follow=True):
return cls.c.post(cls.get_sign_up_view_url(), data, follow=follow)
@classmethod
def get_impersonate_view_url(cls):
return reverse("relate-impersonate")
@classmethod
def get_stop_impersonate_view_url(cls):
return reverse("relate-stop_impersonating")
@classmethod
def get_impersonate(cls):
return cls.c.get(cls.get_impersonate_view_url())
@classmethod
def post_impersonate(cls, impersonatee, follow=True):
data = {"add_impersonation_header": ["on"],
"submit": [''],
}
data["user"] = [str(impersonatee.pk)]
return cls.c.post(cls.get_impersonate_view_url(), data, follow=follow)
@classmethod
def get_stop_impersonate(cls, follow=True):
return cls.c.get(cls.get_stop_impersonate_view_url(), follow=follow)
@classmethod
def post_stop_impersonate(cls, follow=True):
data = {"submit": ['']}
return cls.c.post(
cls.get_stop_impersonate_view_url(), data, follow=follow)
@classmethod
def get_confirm_stop_impersonate_view_url(cls):
return reverse("relate-confirm_stop_impersonating")
@classmethod
def get_confirm_stop_impersonate(cls, follow=True):
return cls.c.get(
cls.get_confirm_stop_impersonate_view_url(), follow=follow)
@classmethod
def post_confirm_stop_impersonate(cls, follow=True):
return cls.c.post(
cls.get_confirm_stop_impersonate_view_url(), {}, follow=follow)
@classmethod
def get_reset_password_url(cls, use_instid=False):
kwargs = {}
if use_instid:
kwargs["field"] = "instid"
return reverse("relate-reset_password", kwargs=kwargs)
@classmethod
def get_reset_password(cls, use_instid=False):
return cls.c.get(cls.get_reset_password_url(use_instid))
@classmethod
def post_reset_password(cls, data, use_instid=False):
return cls.c.post(cls.get_reset_password_url(use_instid),
data=data)
def get_reset_password_stage2_url(self, user_id, sign_in_key, **kwargs):
url = reverse("relate-reset_password_stage2", args=(user_id, sign_in_key))
querystring = kwargs.pop("querystring", None)
if querystring is not None:
assert isinstance(querystring, dict)
url += ("?%s"
% "&".join(
["%s=%s" % (k, v)
for (k, v) in six.iteritems(querystring)]))
return url
def get_reset_password_stage2(self, user_id, sign_in_key, **kwargs):
return self.c.get(self.get_reset_password_stage2_url(
user_id=user_id, sign_in_key=sign_in_key, **kwargs))
def post_reset_password_stage2(self, user_id, sign_in_key, data, **kwargs):
return self.c.post(self.get_reset_password_stage2_url(
user_id=user_id, sign_in_key=sign_in_key, **kwargs), data=data)
@classmethod
def get_fake_time_url(cls):
@classmethod
def get_set_fake_time(cls):
return cls.c.get(cls.get_fake_time_url())
@classmethod
def post_set_fake_time(cls, data, follow=True):
return cls.c.post(cls.get_fake_time_url(), data, follow=follow)
def assertSessionFakeTimeEqual(self, session, expected_date_time): # noqa
fake_time_timestamp = session.get("relate_fake_time", None)
if fake_time_timestamp is None:
faked_time = None
if expected_date_time is not None:
raise AssertionError(
"the session doesn't have 'relate_fake_time' attribute")
else:
faked_time = datetime.datetime.fromtimestamp(fake_time_timestamp)
self.assertEqual(faked_time, expected_date_time)
def assertSessionFakeTimeIsNone(self, session): # noqa
self.assertSessionFakeTimeEqual(session, None)
@classmethod
def get_set_pretend_facilities_url(cls):
@classmethod
def get_set_pretend_facilities(cls):
return cls.c.get(cls.get_set_pretend_facilities_url())
@classmethod
def post_set_pretend_facilities(cls, data, follow=True):
return cls.c.post(cls.get_set_pretend_facilities_url(), data,
follow=follow)
@classmethod
def force_remove_all_course_dir(cls):
# This is only necessary for courses which are created test wise,
# not class wise.
from relate.utils import force_remove_path
from course.content import get_course_repo_path
for c in Course.objects.all():
force_remove_path(get_course_repo_path(c))
def assertSessionPretendFacilitiesContains(self, session, expected_facilities): # noqa
pretended = session.get("relate_pretend_facilities", None)
if expected_facilities is None:
return self.assertIsNone(pretended)
if pretended is None:
raise AssertionError(
"the session doesn't have "
"'relate_pretend_facilities' attribute")
if isinstance(expected_facilities, (list, tuple)):
self.assertTrue(set(expected_facilities).issubset(set(pretended)))
else:
self.assertTrue(expected_facilities in pretended)
def assertSessionPretendFacilitiesIsNone(self, session): # noqa
pretended = session.get("relate_pretend_facilities", None)
self.assertIsNone(pretended)
def assertFormErrorLoose(self, response, errors, form_name="form"): # noqa
"""Assert that errors is found in response.context['form'] errors"""
if errors is None:
errors = []
if not isinstance(errors, (list, tuple)):
errors = [errors]
form_errors = ". ".join(list(
itertools.chain(*response.context[form_name].errors.values())))
if form_errors is None or not form_errors:
if errors:
if form_errors:
if not errors:
self.fail("%s unexpectedly has following errors: %s"
% (form_name, repr(form_errors)))
for err in errors:
self.assertIn(err, form_errors)
# {{{ defined here so that they can be used by in classmethod and instance method
def get_flow_page_ordinal_from_page_id(flow_session_id, page_id,
with_group_id=False):
flow_page_data = FlowPageData.objects.get(
flow_session__id=flow_session_id,
page_id=page_id
)
if with_group_id:
return flow_page_data.page_ordinal, flow_page_data.group_id
return flow_page_data.page_ordinal
def get_flow_page_id_from_page_ordinal(flow_session_id, page_ordinal,
with_group_id=False):
flow_page_data = FlowPageData.objects.get(
flow_session__id=flow_session_id,
if with_group_id:
return flow_page_data.page_id, flow_page_data.group_id
return flow_page_data.page_id
# }}}
class CoursesTestMixinBase(SuperuserCreateMixin):
# A list of Dicts, each of which contain a course dict and a list of
# participations. See SINGLE_COURSE_SETUP_LIST for the setup for one course.
courses_setup_list = []
none_participation_user_create_kwarg_list = []
courses_attributes_extra_list = None
override_settings_at_post_create_course = {}
@classmethod
def setUpTestData(cls): # noqa
super(CoursesTestMixinBase, cls).setUpTestData()
cls.default_flow_params = None
if cls.courses_attributes_extra_list is not None:
if (len(cls.courses_attributes_extra_list)
!= len(cls.courses_setup_list)):
raise ValueError(
"'courses_attributes_extra_list' must has equal length "
"with courses")
for i, course_setup in enumerate(cls.courses_setup_list):
if "course" not in course_setup:
continue
cls.n_courses += 1
course_identifier = course_setup["course"]["identifier"]
course_setup_kwargs = course_setup["course"]
if cls.courses_attributes_extra_list:
extra_attrs = cls.courses_attributes_extra_list[i]
assert isinstance(extra_attrs, dict)
course_setup_kwargs.update(extra_attrs)
cls.create_course(course_setup_kwargs)
except Exception:
raise
course = Course.objects.get(identifier=course_identifier)
if "participations" in course_setup:
for participation in course_setup["participations"]:
create_user_kwargs = participation.get("user")
if not create_user_kwargs:
continue
role_identifier = participation.get("role_identifier")
if not role_identifier:
continue
cls.create_participation(
course=course,
user_or_create_user_kwargs=create_user_kwargs,
role_identifier=role_identifier,
status=participation.get("status",
participation_status.active)
)
# Remove superuser from participation for further test
# such as impersonate in auth module
try:
superuser_participations = (
Participation.objects.filter(user=cls.superuser))
for sp in superuser_participations:
Participation.delete(sp)
except Participation.DoesNotExist:
pass
cls.non_participation_users = get_user_model().objects.none()
if cls.none_participation_user_create_kwarg_list:
pks = []
for create_user_kwargs in (
cls.none_participation_user_create_kwarg_list):
user = cls.create_user(create_user_kwargs)
pks.append(user.pk)
cls.non_participation_users = (
get_user_model().objects.filter(pk__in=pks))
cls.course_qset = Course.objects.all()
@classmethod
def create_user(cls, create_user_kwargs):
user, created = get_user_model().objects.get_or_create(
email__iexact=create_user_kwargs["email"], defaults=create_user_kwargs)
try:
# TODO: why pop failed here?
password = create_user_kwargs["password"]
return user
@classmethod
def create_participation(
cls, course, user_or_create_user_kwargs,
role_identifier=None, status=None):
if isinstance(user_or_create_user_kwargs, get_user_model()):
user = user_or_create_user_kwargs
else:
assert isinstance(user_or_create_user_kwargs, dict)
user = cls.create_user(user_or_create_user_kwargs)
if status is None:
status = participation_status.active
participation, p_created = Participation.objects.get_or_create(
user=user,
course=course,
status=status
)
if role_identifier is None:
role_identifier = "student"
if p_created:
role = ParticipationRole.objects.filter(
course=course, identifier=role_identifier)
participation.roles.set(role)
return participation
@classmethod
def post_create_course(cls, create_course_kwargs, raise_error=True,
login_superuser=True):
# To speed up, use create_course instead, this is better used for tests
if login_superuser:
cls.c.force_login(cls.superuser)
existing_course_count = Course.objects.count()
with override_settings(**cls.override_settings_at_post_create_course):
resp = cls.c.post(cls.get_set_up_new_course_url(),
data=create_course_kwargs)
all_courses = Course.objects.all()
if not all_courses.count() == existing_course_count + 1:
error_string = None
# most probably the reason course creation form error
form_context = resp.context.__getitem__("form")
assert form_context
error_list = []
if form_context.errors:
error_list = [
"%s: %s"
% (field,
"\n".join(["%s:%s" % (type(e).__name__, str(e))
for e in errs]))
for field, errs
in six.iteritems(form_context.errors.as_data())]
non_field_errors = form_context.non_field_errors()
if non_field_errors:
error_list.append(repr(non_field_errors))
if error_list:
error_string = "\n".join(error_list)
if not error_string:
error_string = ("course creation failed for unknown errors")
raise CourseCreateFailure(error_string)
# the course is created successfully
last_course = all_courses.last()
assert last_course
url_cache_key, commit_sha_cach_key = (
git_source_url_to_cache_keys(last_course.git_source))
mc.set_multi({url_cache_key: get_course_repo_path(last_course),
commit_sha_cach_key: last_course.active_git_commit_sha},
def create_course(cls, create_course_kwargs, raise_error=True):
has_cached_repo = False
repo_cache_key, commit_sha_cach_key = (
git_source_url_to_cache_keys(create_course_kwargs["git_source"]))
try:
exist_course_repo_path = mc.get(repo_cache_key)
exist_commit_sha = mc.get(commit_sha_cach_key)
if os.path.isdir(exist_course_repo_path):
has_cached_repo = bool(exist_course_repo_path and exist_commit_sha)
else:
has_cached_repo = False
except Exception:
pass
if not has_cached_repo:
# fall back to post create
return cls.post_create_course(
create_course_kwargs, raise_error=raise_error)
existing_course_count = Course.objects.count()
new_course_repo_path = os.path.join(settings.GIT_ROOT,
create_course_kwargs["identifier"])
shutil.copytree(exist_course_repo_path, new_course_repo_path)
create_kwargs = deepcopy(create_course_kwargs)
create_kwargs["active_git_commit_sha"] = exist_commit_sha
Course.objects.create(**create_kwargs)
assert Course.objects.count() == existing_course_count + 1
@classmethod
def get_course_view_url(cls, view_name, course_identifier=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
return reverse(view_name, args=[course_identifier])
@classmethod
def get_set_up_new_course_url(cls):
return reverse("relate-set_up_new_course")
@classmethod
def get_set_up_new_course(cls):
return cls.c.get(cls.get_update_course_url)
@classmethod
def get_edit_course_url(cls, course_identifier=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
return cls.get_course_view_url("relate-edit_course", course_identifier)
@classmethod
def post_edit_course(cls, data, course=None):
course = course or cls.get_default_course()
edit_course_url = cls.get_edit_course_url(course.identifier)
return cls.c.post(edit_course_url, data)
@classmethod
def get_edit_course(cls, course=None):
course = course or cls.get_default_course()
return cls.c.get(cls.get_edit_course_url(course.identifier))
@classmethod
def get_course_page_url(cls, course_identifier=None):
return cls.get_course_view_url("relate-course_page", course_identifier)
@classmethod
def get_finish_flow_session_view_url(cls, course_identifier=None,
flow_session_id=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
if flow_session_id is None:
flow_session_id = cls.get_default_flow_session_id(course_identifier)
kwargs = {"course_identifier": course_identifier,
"flow_session_id": flow_session_id}
return reverse("relate-finish_flow_session_view", kwargs=kwargs)
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
@classmethod
def _get_grades_url(cls, args=None, kwargs=None):
return reverse("relate-view_participant_grades",
args=args, kwargs=kwargs)
@classmethod
def get_my_grades_url(cls, course_identifier=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
return cls._get_grades_url(args=[course_identifier])
@classmethod
def get_my_grades_view(cls, course_identifier=None):
return cls.c.get(cls.get_my_grades_url(course_identifier))
@classmethod
def get_participant_grades_url(cls, participation_id, course_identifier=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
return cls._get_grades_url(
kwargs={"course_identifier": course_identifier,
"participation_id": participation_id})
@classmethod
def get_participant_grades_view(cls, participation_id, course_identifier=None,
force_login_instructor=True):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
if force_login_instructor:
switch_to = cls.get_default_instructor_user(course_identifier)
else:
switch_to = cls.get_logged_in_user()
with cls.temporarily_switch_to_user(switch_to):
return cls.c.get(
cls.get_participant_grades_url(participation_id, course_identifier))
@classmethod
def get_gradebook_by_opp_url(
cls, gopp_identifier, view_page_grades=False, course_identifier=None):
opp_id = GradingOpportunity.objects.get(identifier=gopp_identifier).pk
course_identifier = (
course_identifier or cls.get_default_course_identifier())
kwargs = {"course_identifier": course_identifier,
"opp_id": opp_id}
url = reverse("relate-view_grades_by_opportunity",
kwargs=kwargs)
if view_page_grades:
url += "?view_page_grades=1"
return url
@classmethod
def get_gradebook_by_opp_view(
cls, gopp_identifier, view_page_grades=False, course_identifier=None,
force_login_instructor=True):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
if force_login_instructor:
switch_to = cls.get_default_instructor_user(course_identifier)
else:
switch_to = cls.get_logged_in_user()
with cls.temporarily_switch_to_user(switch_to):
return cls.c.get(cls.get_gradebook_by_opp_url(
gopp_identifier, view_page_grades, course_identifier))
@classmethod
def get_logged_in_user(cls):
logged_in_user_id = cls.c.session['_auth_user_id']
from django.contrib.auth import get_user_model
logged_in_user = get_user_model().objects.get(
pk=int(logged_in_user_id))
except KeyError:
logged_in_user = None
return logged_in_user
@classmethod
def temporarily_switch_to_user(cls, switch_to):
from functools import wraps
class ClientUserSwitcher(object):
def __init__(self, switch_to):
self.logged_in_user = cls.get_logged_in_user()
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
def __enter__(self):
if self.logged_in_user == self.switch_to:
return
if self.switch_to is None:
self.client.logout()
return
self.client.force_login(self.switch_to)
def __exit__(self, exc_type, exc_val, exc_tb):
if self.logged_in_user == self.switch_to:
return
if self.logged_in_user is None:
self.client.logout()
return
self.client.force_login(self.logged_in_user)
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kw):
with self:
return func(*args, **kw)
return wrapper
return ClientUserSwitcher(switch_to)
@classmethod
def get_default_course(cls):
if Course.objects.count() > 1:
raise AttributeError(
"'course' arg can not be omitted for "
"testcases with more than one courses")
raise NotImplementedError
def get_default_course_identifier(cls):
if Course.objects.count() > 1:
raise AttributeError(
"'course_identifier' arg can not be omitted for "
"testcases with more than one courses")
raise NotImplementedError
@classmethod
def get_latest_session_id(cls, course_identifier):
flow_session_qset = FlowSession.objects.filter(
course__identifier=course_identifier).order_by('-pk')[:1]
if flow_session_qset:
return flow_session_qset[0].id
else:
return None
@classmethod
def get_default_flow_session_id(cls, course_identifier):
raise NotImplementedError
def update_default_flow_session_id(cls, course_identifier):
raise NotImplementedError