Newer
Older
from __future__ import division
__copyright__ = "Copyright (C) 2017 Dong Zhuang, Andreas Kloeckner, Zesheng Wang"
__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import re
import shutil
import hashlib
import memcache
from django.test import Client, override_settings
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
Course, Participation, ParticipationRole, FlowSession, FlowPageData,
FlowPageVisit)
from course.constants import participation_status, user_status
from course.content import get_course_repo_path
CREATE_SUPERUSER_KWARGS = {
"username": "test_admin",
"password": "test_admin",
"email": "test_admin@example.com",
"first_name": "Test",
"last_name": "Admin"}
SINGLE_COURSE_SETUP_LIST = [
{
"course": {
"identifier": "test-course",
"name": "Test Course",
"number": "CS123",
"time_period": "Fall 2016",
"hidden": False,
"listed": True,
"accepts_enrollment": True,
"git_source": "git://github.com/inducer/relate-sample",
"course_file": "course.yml",
"events_file": "events.yml",
"enrollment_approval_required": False,
"enrollment_required_email_suffix": "",
"preapproval_require_verified_inst_id": True,
"from_email": "inform@tiker.net",
"notify_email": "inform@tiker.net"},
"participations": [
{
"role_identifier": "instructor",
"user": {
"username": "test_instructor",
"password": "test_instructor",
"email": "test_instructor@example.com",
"first_name": "Test",
"last_name": "Instructor"},
"status": participation_status.active
},
{
"role_identifier": "ta",
"user": {
"username": "test_ta",
"password": "test",
"email": "test_ta@example.com",
"first_name": "Test",
"last_name": "TA"},
"status": participation_status.active
},
{
"role_identifier": "student",
"user": {
"username": "test_student",
"password": "test",
"email": "test_student@example.com",
"last_name": "Student"},
"status": participation_status.active
}
],
}
]
TWO_COURSE_SETUP_LIST = deepcopy(SINGLE_COURSE_SETUP_LIST)
TWO_COURSE_SETUP_LIST[0]["course"]["identifier"] = "test-course1"
TWO_COURSE_SETUP_LIST += deepcopy(SINGLE_COURSE_SETUP_LIST)
TWO_COURSE_SETUP_LIST[1]["course"]["identifier"] = "test-course2"
NONE_PARTICIPATION_USER_CREATE_KWARG_LIST = [
{
"username": "test_user1",
"password": "test_user1",
"email": "test_user1@suffix.com",
"first_name": "Test",
"last_name": "User1",
"institutional_id": "test_user1_institutional_id",
"institutional_id_verified": True,
"status": user_status.active
},
{
"username": "test_user2",
"password": "test_user2",
"email": "test_user2@nosuffix.com",
"first_name": "Test",
"last_name": "User2",
"institutional_id": "test_user2_institutional_id",
"institutional_id_verified": False,
"status": user_status.active
},
{
"username": "test_user3",
"password": "test_user3",
"email": "test_user3@suffix.com",
"first_name": "Test",
"last_name": "User3",
"institutional_id": "test_user3_institutional_id",
"institutional_id_verified": True,
"status": user_status.unconfirmed
},
{
"username": "test_user4",
"password": "test_user4",
"email": "test_user4@no_suffix.com",
"first_name": "Test",
"last_name": "User4",
"institutional_id": "test_user4_institutional_id",
"institutional_id_verified": False,
"status": user_status.unconfirmed
try:
mc = memcache.Client(['127.0.0.1:11211'])
except Exception:
pass
SELECT2_HTML_FIELD_ID_SEARCH_PATTERN = re.compile(r'data-field_id="([^"]+)"')
def git_source_url_to_cache_keys(url):
url_hash = hashlib.md5(url.encode("utf-8")).hexdigest()
return (
"test_course:%s" % url_hash,
"test_sha:%s" % url_hash
)
class CourseCreateFailure(Exception):
pass
class ResponseContextMixin(object):
"""
Response context refers to "the template Context instance that was used
to render the template that produced the response content".
Ref: https://docs.djangoproject.com/en/dev/topics/testing/tools/#django.test.Response.context # noqa
"""
def get_response_context_value_by_name(self, response, context_name):
value = response.context.__getitem__(context_name)
self.assertIsNotNone(
value,
msg="%s does not exist in given response" % context_name)
return value
def assertResponseContextIsNone(self, resp, context_name): # noqa
try:
value = self.get_response_context_value_by_name(resp, context_name)
except AssertionError:
# the context item doesn't exist
pass
else:
self.assertIsNone(value)
def assertResponseContextIsNotNone(self, resp, context_name, msg=""): # noqa
value = self.get_response_context_value_by_name(resp, context_name)
self.assertIsNotNone(value, msg)
def assertResponseContextEqual(self, resp, context_name, expected_value): # noqa
value = self.get_response_context_value_by_name(resp, context_name)
self.assertEqual(value, expected_value)
def assertResponseContextContains(self, resp, # noqa
context_name, expected_value, html=False):
value = self.get_response_context_value_by_name(resp, context_name)
if not html:
self.assertIn(expected_value, value)
else:
self.assertInHTML(expected_value, value)
def assertResponseContextRegex( # noqa
self, resp, # noqa
context_name, expected_value_regex):
value = self.get_response_context_value_by_name(resp, context_name)
six.assertRegex(self, value, expected_value_regex)
def get_response_context_answer_feedback(self, response):
return self.get_response_context_value_by_name(response, "feedback")
def assertResponseContextAnswerFeedbackContainsFeedback( # noqa
self, response, expected_feedback,
include_bulk_feedback=True):
answer_feedback = self.get_response_context_answer_feedback(response)
feedback_str = answer_feedback.feedback
if include_bulk_feedback:
feedback_str += answer_feedback.bulk_feedback
self.assertTrue(hasattr(answer_feedback, "feedback"))
def assertResponseContextAnswerFeedbackNotContainsFeedback( # noqa
self, response, expected_feedback):
answer_feedback = self.get_response_context_answer_feedback(response)
self.assertTrue(hasattr(answer_feedback, "feedback"))
self.assertNotIn(expected_feedback, answer_feedback.feedback)
def assertResponseContextAnswerFeedbackCorrectnessEquals( # noqa
self, response, expected_correctness):
answer_feedback = self.get_response_context_answer_feedback(response)
if expected_correctness is None:
try:
self.assertTrue(hasattr(answer_feedback, "correctness"))
except AssertionError:
pass
else:
self.assertIsNone(answer_feedback.correctness)
else:
if answer_feedback.correctness is None:
return self.fail("The returned correctness is None, not %s"
% expected_correctness)
self.assertTrue(
abs(float(answer_feedback.correctness)
- float(str(expected_correctness))) < ATOL,
"%s does not equal %s"
% (str(answer_feedback.correctness)[:5],
str(expected_correctness)[:5]))
def get_response_body(self, response):
return self.get_response_context_value_by_name(response, "body")
def get_page_response_correct_answer(self, response):
return self.get_response_context_value_by_name(response, "correct_answer")
def get_page_response_feedback(self, response):
return self.get_response_context_value_by_name(response, "feedback")
def debug_print_response_context_value(self, resp, context_name):
try:
value = self.get_response_context_value_by_name(resp, context_name)
print("\n-----------context %s-------------"
% context_name)
if isinstance(value, (list, tuple)):
from course.validation import ValidationWarning
for v in value:
if isinstance(v, ValidationWarning):
print(v.text)
else:
print(repr(v))
else:
print(value)
print("-----------context end-------------\n")
except AssertionError:
print("\n-------no value for context %s----------" % context_name)
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def get_select2_field_id_from_response(self, response,
form_context_name="form"):
self.assertResponseContextIsNotNone(
response, form_context_name,
"The response doesn't contain a context named '%s'"
% form_context_name)
form_str = str(response.context[form_context_name])
m = SELECT2_HTML_FIELD_ID_SEARCH_PATTERN.search(form_str)
assert m, "pattern not found in %s" % form_str
return m.group(1)
def select2_get_request(self, field_id, term=None,
select2_urlname='django_select2-json'):
select2_url = reverse(select2_urlname)
params = {"field_id": field_id}
if term is not None:
assert isinstance(term, six.string_types)
term = term.strip()
if term:
params["term"] = term
return self.c.get(select2_url, params,
HTTP_X_REQUESTED_WITH='XMLHttpRequest')
def get_select2_response_data(self, response, key="results"):
import json
return json.loads(response.content.decode('utf-8'))[key]
create_superuser_kwargs = CREATE_SUPERUSER_KWARGS
@classmethod
def setUpTestData(cls): # noqa
# Create superuser, without this, we cannot
# create user, course and participation.
cls.superuser = cls.create_superuser()
cls.c = Client()
cls.settings_git_root_override = (
override_settings(GIT_ROOT=tempfile.mkdtemp()))
cls.settings_git_root_override.enable()
super(SuperuserCreateMixin, cls).setUpTestData()
@classmethod
def tearDownClass(cls): # noqa
super(SuperuserCreateMixin, cls).tearDownClass()
@classmethod
def create_superuser(cls):
return get_user_model().objects.create_superuser(
**cls.create_superuser_kwargs)
def get_sign_up_view_url(self):
return reverse("relate-sign_up")
def get_sign_up(self, follow=True):
return self.c.get(self.get_sign_up_view_url(), follow=follow)
def post_sign_up(self, data, follow=True):
return self.c.post(self.get_sign_up_view_url(), data, follow=follow)
def get_profile_view_url(self):
return reverse("relate-user_profile")
def get_profile(self, follow=True):
return self.c.get(self.get_profile_view_url(), follow=follow)
def post_profile(self, data, follow=True):
return self.c.post(self.get_profile_view_url(), data, follow=follow)
def post_signout(self, data, follow=True):
return self.c.post(self.get_sign_up_view_url(), data, follow=follow)
def get_impersonate_view_url(self):
return reverse("relate-impersonate")
def get_stop_impersonate_view_url(self):
return reverse("relate-stop_impersonating")
def get_impersonate(self):
return self.c.get(self.get_impersonate_view_url())
def post_impersonate(self, impersonatee, follow=True):
data = {"add_impersonation_header": ["on"],
"submit": [''],
}
data["user"] = [str(impersonatee.pk)]
return self.c.post(self.get_impersonate_view_url(), data, follow=follow)
def get_stop_impersonate(self, follow=True):
return self.c.get(self.get_stop_impersonate_view_url(), follow=follow)
def post_stop_impersonate(self, follow=True):
data = {"submit": ['']}
return self.c.post(
self.get_stop_impersonate_view_url(), data, follow=follow)
def get_confirm_stop_impersonate_view_url(self):
return reverse("relate-confirm_stop_impersonating")
def get_confirm_stop_impersonate(self, follow=True):
return self.c.get(
self.get_confirm_stop_impersonate_view_url(), follow=follow)
def post_confirm_stop_impersonate(self, follow=True):
return self.c.post(
self.get_confirm_stop_impersonate_view_url(), {}, follow=follow)
@classmethod
def get_reset_password_url(cls, use_instid=False):
kwargs = {}
if use_instid:
kwargs["field"] = "instid"
return reverse("relate-reset_password", kwargs=kwargs)
@classmethod
def get_reset_password(cls, use_instid=False):
return cls.c.get(cls.get_reset_password_url(use_instid))
@classmethod
def post_reset_password(cls, data, use_instid=False):
return cls.c.post(cls.get_reset_password_url(use_instid),
data=data)
def get_reset_password_stage2_url(self, user_id, sign_in_key, **kwargs):
url = reverse("relate-reset_password_stage2", args=(user_id, sign_in_key))
querystring = kwargs.pop("querystring", None)
if querystring is not None:
assert isinstance(querystring, dict)
url += ("?%s"
% "&".join(
["%s=%s" % (k, v)
for (k, v) in six.iteritems(querystring)]))
return url
def get_reset_password_stage2(self, user_id, sign_in_key, **kwargs):
return self.c.get(self.get_reset_password_stage2_url(
user_id=user_id, sign_in_key=sign_in_key, **kwargs))
def post_reset_password_stage2(self, user_id, sign_in_key, data, **kwargs):
return self.c.post(self.get_reset_password_stage2_url(
user_id=user_id, sign_in_key=sign_in_key, **kwargs), data=data)
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def get_fake_time_url(self):
return reverse("relate-set_fake_time")
def get_set_fake_time(self):
return self.c.get(self.get_fake_time_url())
def post_set_fake_time(self, data, follow=True):
return self.c.post(self.get_fake_time_url(), data, follow=follow)
def assertSessionFakeTimeEqual(self, session, expected_date_time): # noqa
fake_time_timestamp = session.get("relate_fake_time", None)
if fake_time_timestamp is None:
faked_time = None
if expected_date_time is not None:
raise AssertionError(
"the session doesn't have 'relate_fake_time' attribute")
else:
faked_time = datetime.datetime.fromtimestamp(fake_time_timestamp)
self.assertEqual(faked_time, expected_date_time)
def assertSessionFakeTimeIsNone(self, session): # noqa
self.assertSessionFakeTimeEqual(session, None)
def get_set_pretend_facilities_url(self):
return reverse("relate-set_pretend_facilities")
def get_set_pretend_facilities(self):
return self.c.get(self.get_set_pretend_facilities_url())
def post_set_pretend_facilities(self, data, follow=True):
return self.c.post(self.get_set_pretend_facilities_url(), data,
follow=follow)
def force_remove_all_course_dir(self):
# This is only necessary for courses which are created test wise,
# not class wise.
from relate.utils import force_remove_path
from course.content import get_course_repo_path
for c in Course.objects.all():
force_remove_path(get_course_repo_path(c))
def assertSessionPretendFacilitiesContains(self, session, expected_facilities): # noqa
pretended = session.get("relate_pretend_facilities", None)
if expected_facilities is None:
return self.assertIsNone(pretended)
if pretended is None:
raise AssertionError(
"the session doesn't have "
"'relate_pretend_facilities' attribute")
if isinstance(expected_facilities, (list, tuple)):
self.assertTrue(set(expected_facilities).issubset(set(pretended)))
else:
self.assertTrue(expected_facilities in pretended)
def assertSessionPretendFacilitiesIsNone(self, session): # noqa
pretended = session.get("relate_pretend_facilities", None)
self.assertIsNone(pretended)
def assertFormErrorLoose(self, response, error, form_name="form"): # noqa
"""Assert that error is found in response.context['form'] errors"""
import itertools
try:
form_errors = list(
itertools.chain(*response.context[form_name].errors.values()))
except TypeError:
form_errors = None
if error is not None and form_errors is None:
self.fail("%(form_name)s have no errors")
elif error is None and form_errors is None:
return
self.assertIn(str(error), form_errors)
# {{{ defined here so that they can be used by in classmethod and instance method
def get_flow_page_ordinal_from_page_id(flow_session_id, page_id):
flow_page_data = FlowPageData.objects.get(
flow_session__id=flow_session_id,
page_id=page_id
)
return flow_page_data.page_ordinal
def get_flow_page_id_from_page_ordinal(flow_session_id, page_ordinal):
flow_page_data = FlowPageData.objects.get(
flow_session__id=flow_session_id,
)
return flow_page_data.page_id
# }}}
class CoursesTestMixinBase(SuperuserCreateMixin):
# A list of Dicts, each of which contain a course dict and a list of
# participations. See SINGLE_COURSE_SETUP_LIST for the setup for one course.
courses_setup_list = []
none_participation_user_create_kwarg_list = []
courses_attributes_extra_list = None
override_settings_at_post_create_course = {}
@classmethod
def setUpTestData(cls): # noqa
super(CoursesTestMixinBase, cls).setUpTestData()
cls.default_flow_params = None
if cls.courses_attributes_extra_list is not None:
if (len(cls.courses_attributes_extra_list)
!= len(cls.courses_setup_list)):
raise ValueError(
"'courses_attributes_extra_list' must has equal length "
"with courses")
for i, course_setup in enumerate(cls.courses_setup_list):
if "course" not in course_setup:
continue
cls.n_courses += 1
course_identifier = course_setup["course"]["identifier"]
course_setup_kwargs = course_setup["course"]
if cls.courses_attributes_extra_list:
extra_attrs = cls.courses_attributes_extra_list[i]
assert isinstance(extra_attrs, dict)
course_setup_kwargs.update(extra_attrs)
cls.create_course(course_setup_kwargs)
except Exception:
raise
course = Course.objects.get(identifier=course_identifier)
if "participations" in course_setup:
for participation in course_setup["participations"]:
create_user_kwargs = participation.get("user")
if not create_user_kwargs:
continue
role_identifier = participation.get("role_identifier")
if not role_identifier:
continue
cls.create_participation(
course=course,
user_or_create_user_kwargs=create_user_kwargs,
role_identifier=role_identifier,
status=participation.get("status",
participation_status.active)
)
# Remove superuser from participation for further test
# such as impersonate in auth module
try:
superuser_participations = (
Participation.objects.filter(user=cls.superuser))
for sp in superuser_participations:
Participation.delete(sp)
except Participation.DoesNotExist:
pass
cls.non_participation_users = get_user_model().objects.none()
if cls.none_participation_user_create_kwarg_list:
pks = []
for create_user_kwargs in (
cls.none_participation_user_create_kwarg_list):
user = cls.create_user(create_user_kwargs)
pks.append(user.pk)
cls.non_participation_users = (
get_user_model().objects.filter(pk__in=pks))
cls.course_qset = Course.objects.all()
@classmethod
def tearDownClass(cls):
cls.c.logout()
super(CoursesTestMixinBase, cls).tearDownClass()
@classmethod
def create_user(cls, create_user_kwargs):
user, created = get_user_model().objects.get_or_create(
email__iexact=create_user_kwargs["email"], defaults=create_user_kwargs)
try:
# TODO: why pop failed here?
password = create_user_kwargs["password"]
return user
@classmethod
def create_participation(
cls, course, user_or_create_user_kwargs,
role_identifier=None, status=None):
if isinstance(user_or_create_user_kwargs, get_user_model()):
user = user_or_create_user_kwargs
else:
assert isinstance(user_or_create_user_kwargs, dict)
user = cls.create_user(user_or_create_user_kwargs)
if status is None:
status = participation_status.active
participation, p_created = Participation.objects.get_or_create(
user=user,
course=course,
status=status
)
if role_identifier is None:
role_identifier = "student"
if p_created:
role = ParticipationRole.objects.filter(
course=course, identifier=role_identifier)
participation.roles.set(role)
return participation
@classmethod
def post_create_course(cls, create_course_kwargs, raise_error=True,
login_superuser=True):
# To speed up, use create_course instead, this is better used for tests
if login_superuser:
cls.c.force_login(cls.superuser)
existing_course_count = Course.objects.count()
with override_settings(**cls.override_settings_at_post_create_course):
resp = cls.c.post(cls.get_set_up_new_course_url(),
data=create_course_kwargs)
all_courses = Course.objects.all()
if not all_courses.count() == existing_course_count + 1:
error_string = None
# most probably the reason course creation form error
form_context = resp.context.__getitem__("form")
assert form_context
error_list = []
if form_context.errors:
error_list = [
"%s: %s"
% (field,
"\n".join(["%s:%s" % (type(e).__name__, str(e))
for e in errs]))
for field, errs
in six.iteritems(form_context.errors.as_data())]
non_field_errors = form_context.non_field_errors()
if non_field_errors:
error_list.append(repr(non_field_errors))
if error_list:
error_string = "\n".join(error_list)
if not error_string:
error_string = ("course creation failed for unknown errors")
raise CourseCreateFailure(error_string)
# the course is created successfully
last_course = all_courses.last()
assert last_course
url_cache_key, commit_sha_cach_key = (
git_source_url_to_cache_keys(last_course.git_source))
mc.set_multi({url_cache_key: get_course_repo_path(last_course),
commit_sha_cach_key: last_course.active_git_commit_sha},
def create_course(cls, create_course_kwargs, raise_error=True):
has_cached_repo = False
repo_cache_key, commit_sha_cach_key = (
git_source_url_to_cache_keys(create_course_kwargs["git_source"]))
try:
exist_course_repo_path = mc.get(repo_cache_key)
exist_commit_sha = mc.get(commit_sha_cach_key)
if os.path.isdir(exist_course_repo_path):
has_cached_repo = bool(exist_course_repo_path and exist_commit_sha)
else:
has_cached_repo = False
except Exception:
pass
if not has_cached_repo:
# fall back to post create
return cls.post_create_course(
create_course_kwargs, raise_error=raise_error)
existing_course_count = Course.objects.count()
new_course_repo_path = os.path.join(settings.GIT_ROOT,
create_course_kwargs["identifier"])
shutil.copytree(exist_course_repo_path, new_course_repo_path)
create_kwargs = deepcopy(create_course_kwargs)
create_kwargs["active_git_commit_sha"] = exist_commit_sha
Course.objects.create(**create_kwargs)
assert Course.objects.count() == existing_course_count + 1
@classmethod
def get_course_view_url(cls, view_name, course_identifier=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
return reverse(view_name, args=[course_identifier])
@classmethod
def get_set_up_new_course_url(cls):
return reverse("relate-set_up_new_course")
@classmethod
def get_set_up_new_course(cls):
return cls.c.get(cls.get_update_course_url)
@classmethod
def get_edit_course_url(cls, course_identifier=None):
return cls.get_course_view_url("relate-edit_course", course_identifier)
@classmethod
def post_edit_course(cls, data, course=None):
course = course or cls.get_default_course()
edit_course_url = cls.get_edit_course_url(course.identifier)
return cls.c.post(edit_course_url, data)
@classmethod
def get_edit_course(cls, course=None):
course = course or cls.get_default_course()
return cls.c.get(cls.get_edit_course_url(course.identifier))
@classmethod
def get_course_page_url(cls, course_identifier=None):
return cls.get_course_view_url("relate-course_page", course_identifier)
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
def get_logged_in_user(self):
try:
logged_in_user_id = self.c.session['_auth_user_id']
from django.contrib.auth import get_user_model
logged_in_user = get_user_model().objects.get(
pk=int(logged_in_user_id))
except KeyError:
logged_in_user = None
return logged_in_user
def temporarily_switch_to_user(self, switch_to):
_self = self
from functools import wraps
class ClientUserSwitcher(object):
def __init__(self, switch_to):
self.client = _self.c
self.switch_to = switch_to
self.logged_in_user = _self.get_logged_in_user()
def __enter__(self):
if self.logged_in_user == self.switch_to:
return
if self.switch_to is None:
self.client.logout()
return
self.client.force_login(self.switch_to)
def __exit__(self, exc_type, exc_val, exc_tb):
if self.logged_in_user == self.switch_to:
return
if self.logged_in_user is None:
self.client.logout()
return
self.client.force_login(self.logged_in_user)
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kw):
with self:
return func(*args, **kw)
return wrapper
return ClientUserSwitcher(switch_to)
@classmethod
def get_default_course(cls):
if Course.objects.count() > 1:
raise AttributeError(
"'course' arg can not be omitted for "
"testcases with more than one courses")
raise NotImplementedError
def get_default_course_identifier(cls):
if Course.objects.count() > 1:
raise AttributeError(
"'course_identifier' arg can not be omitted for "
"testcases with more than one courses")
raise NotImplementedError
def get_latest_session_id(self, course_identifier):
flow_session_qset = FlowSession.objects.filter(
course__identifier=course_identifier).order_by('-pk')[:1]
if flow_session_qset:
return flow_session_qset[0].id
else:
return None
def get_default_flow_session_id(self, course_identifier):
raise NotImplementedError
def update_default_flow_session_id(cls, course_identifier):
raise NotImplementedError
def get_default_instructor_user(self, course_identifier):
return Participation.objects.filter(
course__identifier=course_identifier,
roles__identifier="instructor",
status=participation_status.active
).first().user
@classmethod
def update_course_attribute(cls, attrs, course=None):
# course instead of course_identifier because we need to do
# refresh_from_db
assert isinstance(attrs, dict)
course = course or cls.get_default_course()
if attrs:
course.__dict__.update(attrs)
course.save()
course.refresh_from_db()
def start_flow(cls, flow_id, course_identifier=None):
"""
Notice: this is a classmethod, so this will change the data
created in setUpTestData, so don't do this in individual tests, or
testdata will be different between tests.
"""
course_identifier = course_identifier or cls.get_default_course_identifier()
existing_session_count = FlowSession.objects.all().count()
params = {"course_identifier": course_identifier,
resp = cls.c.post(reverse("relate-view_start_flow", kwargs=params))
new_session_count = FlowSession.objects.all().count()
assert new_session_count == existing_session_count + 1
_, _, params = resolve(resp.url)
cls.default_flow_params = params
cls.update_default_flow_session_id(course_identifier)
@classmethod
def end_flow(cls, course_identifier=None, flow_session_id=None):
"""
Be cautious that this is a classmethod
"""
if cls.default_flow_params is None:
raise RuntimeError("There's no started flow_sessions.")
params = deepcopy(cls.default_flow_params)
if course_identifier:
params["course_identifier"] = course_identifier
if flow_session_id:
params["flow_session_id"] = flow_session_id
resp = cls.c.post(reverse("relate-finish_flow_session_view",
kwargs=params), {'submit': ['']})
return resp
def get_flow_params(self, course_identifier=None, flow_session_id=None):
course_identifier = (
course_identifier or self.get_default_course_identifier())
if flow_session_id is None:
flow_session_id = self.get_default_flow_session_id(course_identifier)
return {
"course_identifier": course_identifier,
"flow_session_id": flow_session_id
}
def get_page_params(self, course_identifier=None, flow_session_id=None,
page_params = self.get_flow_params(course_identifier, flow_session_id)
if page_ordinal is None:
page_ordinal = 0
page_params.update({"page_ordinal": page_ordinal})
return page_params
def get_page_ordinal_via_page_id(
self, page_id, course_identifier=None, flow_session_id=None):
flow_params = self.get_flow_params(course_identifier, flow_session_id)
return (
get_flow_page_ordinal_from_page_id(
flow_params["flow_session_id"], page_id))
def get_page_view_url_by_ordinal(
self, viewname, page_ordinal, course_identifier=None,
flow_session_id=None):
page_params = self.get_page_params(
course_identifier, flow_session_id, page_ordinal)
return reverse(viewname, kwargs=page_params)
def get_page_view_url_by_page_id(
self, viewname, page_id, course_identifier=None, flow_session_id=None):
page_ordinal = self.get_page_ordinal_via_page_id(
page_id, course_identifier, flow_session_id)
return self.get_page_view_url_by_ordinal(
viewname, page_ordinal, course_identifier, flow_session_id)
def get_page_url_by_ordinal(
self, page_ordinal, course_identifier=None, flow_session_id=None):
return self.get_page_view_url_by_ordinal(
"relate-view_flow_page",
page_ordinal, course_identifier, flow_session_id)
def get_page_url_by_page_id(
self, page_id, course_identifier=None, flow_session_id=None):
page_ordinal = self.get_page_ordinal_via_page_id(
page_id, course_identifier, flow_session_id)
return self.get_page_url_by_ordinal(
page_ordinal, course_identifier, flow_session_id)
def get_page_grading_url_by_ordinal(
self, page_ordinal, course_identifier=None, flow_session_id=None):
return self.get_page_view_url_by_ordinal(
"relate-grade_flow_page",
page_ordinal, course_identifier, flow_session_id)
def get_page_grading_url_by_page_id(
self, page_id, course_identifier=None, flow_session_id=None):
page_ordinal = self.get_page_ordinal_via_page_id(
page_id, course_identifier, flow_session_id)
return self.get_page_grading_url_by_ordinal(
page_ordinal, course_identifier, flow_session_id)
def post_answer_by_ordinal(
self, page_ordinal, answer_data,
course_identifier=None, flow_session_id=None):
submit_data = answer_data
submit_data.update({"submit": ["Submit final answer"]})
resp = self.c.post(
self.get_page_url_by_ordinal(
page_ordinal, course_identifier, flow_session_id),
submit_data)
return resp
def post_answer_by_page_id(self, page_id, answer_data,
course_identifier=None, flow_session_id=None):
page_ordinal = self.get_page_ordinal_via_page_id(
page_id, course_identifier, flow_session_id)
return self.post_answer_by_ordinal(
page_ordinal, answer_data, course_identifier, flow_session_id)
@classmethod
def post_answer_by_ordinal_class(cls, page_ordinal, answer_data,
course_identifier, flow_session_id):
submit_data = answer_data
submit_data.update({"submit": ["Submit final answer"]})
page_params = {
"course_identifier": course_identifier,
"flow_session_id": flow_session_id,
}
page_url = reverse("relate-view_flow_page", kwargs=page_params)
resp = cls.c.post(page_url, submit_data)
return resp
@classmethod
def post_answer_by_page_id_class(cls, page_id, answer_data,
course_identifier, flow_session_id):
page_ordinal = get_flow_page_ordinal_from_page_id(flow_session_id, page_id)
return cls.post_answer_by_ordinal_class(page_ordinal, answer_data,
course_identifier, flow_session_id)
def post_grade_by_ordinal(self, page_ordinal, grade_data,
course_identifier=None, flow_session_id=None,
force_login_instructor=True):
post_data = {"submit": [""]}
post_data.update(grade_data)
page_params = self.get_page_params(
course_identifier, flow_session_id, page_ordinal)