Newer
Older
__copyright__ = "Copyright (C) 2017 Dong Zhuang, Andreas Kloeckner, Zesheng Wang"
__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import sys
import tempfile
from collections import OrderedDict
from copy import deepcopy
from functools import partial
import memcache
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.test import Client, RequestFactory, override_settings
from django.urls import resolve, reverse
flow_permission as fperm,
grade_aggregation_strategy as g_strategy,
participation_status,
user_status,
from course.content import get_course_repo_path, get_repo_blob
from course.flow import GradeInfo
from course.models import (
Course,
FlowPageData,
FlowPageVisit,
FlowSession,
GradingOpportunity,
Participation,
ParticipationRole,
from tests.constants import (
COMMIT_SHA_MAP,
FAKED_YAML_PATH,
QUIZ_FLOW_ID,
TEST_PAGE_TUPLE,
CREATE_SUPERUSER_KWARGS = {
"username": "test_admin",
"password": "test_admin",
"email": "test_admin@example.com",
"first_name": "Test",
"last_name": "Admin"}
SINGLE_COURSE_SETUP_LIST = [
{
"course": {
"identifier": "test-course",
"name": "Test Course",
"number": "CS123",
"time_period": "Fall 2016",
"hidden": False,
"listed": True,
"accepts_enrollment": True,
"git_source": "https://github.com/inducer/relate-sample.git",
"course_file": "course.yml",
"events_file": "events.yml",
"enrollment_approval_required": False,
"enrollment_required_email_suffix": "",
"preapproval_require_verified_inst_id": True,
"notify_email": "inform@tiker.net",
},
"participations": [
{
"role_identifier": "instructor",
"user": {
"username": "test_instructor",
"password": "test_instructor",
"email": "test_instructor@example.com",
"last_name": "Instructor"},
"status": participation_status.active
},
{
"role_identifier": "ta",
"user": {
"username": "test_ta",
"password": "test",
"email": "test_ta@example.com",
"last_name": "TA"},
"status": participation_status.active
},
{
"role_identifier": "student",
"user": {
"username": "test_student",
"password": "test",
"email": "test_student@example.com",
"last_name": "Student"},
"status": participation_status.active
}
],
}
]
TWO_COURSE_SETUP_LIST = deepcopy(SINGLE_COURSE_SETUP_LIST)
TWO_COURSE_SETUP_LIST[0]["course"]["identifier"] = "test-course1"
TWO_COURSE_SETUP_LIST += deepcopy(SINGLE_COURSE_SETUP_LIST)
TWO_COURSE_SETUP_LIST[1]["course"]["identifier"] = "test-course2"
NONE_PARTICIPATION_USER_CREATE_KWARG_LIST = [
{
"username": "test_user1",
"password": "test_user1",
"email": "test_user1@suffix.com",
"first_name": "Test",
"last_name": "User1",
"institutional_id": "test_user1_institutional_id",
"institutional_id_verified": True,
"status": user_status.active
},
{
"username": "test_user2",
"password": "test_user2",
"email": "test_user2@nosuffix.com",
"first_name": "Test",
"last_name": "User2",
"institutional_id": "test_user2_institutional_id",
"institutional_id_verified": False,
"status": user_status.active
},
{
"username": "test_user3",
"password": "test_user3",
"email": "test_user3@suffix.com",
"first_name": "Test",
"last_name": "User3",
"institutional_id": "test_user3_institutional_id",
"institutional_id_verified": True,
"status": user_status.unconfirmed
},
{
"username": "test_user4",
"password": "test_user4",
"email": "test_user4@no_suffix.com",
"first_name": "Test",
"last_name": "User4",
"institutional_id": "test_user4_institutional_id",
"institutional_id_verified": False,
"status": user_status.unconfirmed
mc = memcache.Client(["127.0.0.1:11211"])
except Exception:
pass
SELECT2_HTML_FIELD_ID_SEARCH_PATTERN = re.compile(r'data-field_id="([^"]+)"')
def git_source_url_to_cache_keys(url):
url_hash = hashlib.md5(url.encode("utf-8")).hexdigest()
return (
"test_course:%s" % url_hash,
"test_sha:%s" % url_hash
)
class CourseCreateFailure(Exception):
pass
class classmethod_with_client: # noqa: N801
"""This acts like Python's built-in ``classmethod``, with one change:
When called on an instance (i.e. not a class), it automatically supplies
``self.client`` as the first argument.
.. note::
This isn't immensely logical, but it helped avoid an expensive
refactor of the test code to explicitly always pass the client.
(The prior state was much worse: a class-global client was being
used. Almost fortunately, Django 3.2 broke this usage.)
"""
def __init__(self, f):
self.f = f
def __get__(self, obj, cls=None):
if obj is None:
return MethodType(self.f, cls)
else:
return partial(MethodType(self.f, type(obj)), obj.client)
"""
Response context refers to "the template Context instance that was used
to render the template that produced the response content".
Ref: https://docs.djangoproject.com/en/dev/topics/testing/tools/#django.test.Response.context
"""
def get_response_context_value_by_name(self, response, context_name):
try:
value = response.context[context_name]
except KeyError:
self.fail("%s does not exist in given response" % context_name)
else:
return value
def assertResponseHasNoContext(self, response, context_name): # noqa
has_context = True
try:
response.context[context_name]
except KeyError:
has_context = False
if has_context:
self.fail("%s unexpectedly exist in given response" % context_name)
def assertResponseContextIsNone(self, resp, context_name): # noqa
try:
value = self.get_response_context_value_by_name(resp, context_name)
except AssertionError:
# the context item doesn't exist
pass
else:
self.assertIsNone(value)
def assertResponseContextIsNotNone(self, resp, context_name, msg=""): # noqa
value = self.get_response_context_value_by_name(resp, context_name)
self.assertIsNotNone(value, msg)
def assertResponseContextEqual(self, resp, context_name, expected_value): # noqa
value = self.get_response_context_value_by_name(resp, context_name)
try:
self.assertTrue(float(value) - float(expected_value) <= 1e-04)
return
except Exception:
self.assertEqual(value, expected_value)
context_name, expected_value, html=False,
in_bulk=False):
value = self.get_response_context_value_by_name(resp, context_name)
if in_bulk:
if not isinstance(expected_value, list):
expected_value = [expected_value]
for v in expected_value:
if not html:
self.assertIn(v, value)
else:
self.assertInHTML(v, value)
if not html:
self.assertIn(expected_value, value)
else:
self.assertInHTML(expected_value, value)
context_name, expected_value_regex):
value = self.get_response_context_value_by_name(resp, context_name)
self.assertRegex(value, expected_value_regex)
def get_response_context_answer_feedback(self, response):
return self.get_response_context_value_by_name(response, "feedback")
def get_response_context_answer_feedback_string(self, response,
include_bulk_feedback=True):
answer_feedback = self.get_response_context_value_by_name(
response, "feedback")
self.assertTrue(hasattr(answer_feedback, "feedback"))
if not include_bulk_feedback:
return answer_feedback.feedback
if answer_feedback.bulk_feedback is None:
return answer_feedback.feedback
else:
if answer_feedback.feedback is None:
return answer_feedback.bulk_feedback
return answer_feedback.feedback + answer_feedback.bulk_feedback
def assertResponseContextAnswerFeedbackContainsFeedback( # noqa
include_bulk_feedback=True, html=False):
feedback_str = self.get_response_context_answer_feedback_string(
response, include_bulk_feedback)
if not html:
self.assertIn(expected_feedback, feedback_str)
else:
self.assertInHTML(expected_feedback, feedback_str)
def assertResponseContextAnswerFeedbackNotContainsFeedback( # noqa
self, response, expected_feedback,
include_bulk_feedback=True,
html=False):
feedback_str = self.get_response_context_answer_feedback_string(
response, include_bulk_feedback)
self.assertNotIn(expected_feedback, feedback_str)
self.assertInHTML(expected_feedback, feedback_str, count=0)
def assertResponseContextAnswerFeedbackCorrectnessEquals( # noqa
self, response, expected_correctness):
answer_feedback = self.get_response_context_answer_feedback(response)
if expected_correctness is None:
try:
self.assertTrue(hasattr(answer_feedback, "correctness"))
except AssertionError:
pass
else:
self.assertIsNone(answer_feedback.correctness)
else:
if answer_feedback.correctness is None:
return self.fail("The returned correctness is None, not %s"
% expected_correctness)
self.assertTrue(
abs(float(answer_feedback.correctness)
"%s does not equal %s"
% (str(answer_feedback.correctness)[:5],
str(expected_correctness)[:5]))
def get_response_body(self, response):
return self.get_response_context_value_by_name(response, "body")
def get_page_response_correct_answer(self, response):
return self.get_response_context_value_by_name(response, "correct_answer")
def get_page_response_feedback(self, response):
return self.get_response_context_value_by_name(response, "feedback")
def debug_print_response_context_value(self, resp, context_name):
try:
value = self.get_response_context_value_by_name(resp, context_name)
print("\n-----------context %s-------------"
% context_name)
if isinstance(value, (list, tuple)):
from course.validation import ValidationWarning
for v in value:
if isinstance(v, ValidationWarning):
print(v.text)
else:
print(repr(v))
else:
print(value)
print("-----------context end-------------\n")
except AssertionError:
print("\n-------no value for context %s----------" % context_name)
def get_select2_field_id_from_response(self, response,
form_context_name="form"):
self.assertResponseContextIsNotNone(
response, form_context_name,
"The response doesn't contain a context named '%s'"
% form_context_name)
form_str = str(response.context[form_context_name])
m = SELECT2_HTML_FIELD_ID_SEARCH_PATTERN.search(form_str)
assert m, "pattern not found in %s" % form_str
return m.group(1)
def select2_get_request(self, field_id, term=None,
select2_urlname="django_select2:auto-json"):
select2_url = reverse(select2_urlname)
params = {"field_id": field_id}
if term is not None:
term = term.strip()
if term:
params["term"] = term
return self.client.get(select2_url, params,
HTTP_X_REQUESTED_WITH="XMLHttpRequest")
def get_select2_response_data(self, response, key="results"):
import json
return json.loads(response.content.decode("utf-8"))[key]
create_superuser_kwargs = CREATE_SUPERUSER_KWARGS
@classmethod
# Create superuser, without this, we cannot
# create user, course and participation.
cls.superuser = cls.create_superuser()
cls.settings_git_root_override = (
override_settings(GIT_ROOT=tempfile.mkdtemp()))
cls.settings_git_root_override.enable()
@classmethod
def add_user_permission(cls, user, perm, model=Course):
from django.contrib.contenttypes.models import ContentType
content_type = ContentType.objects.get_for_model(model)
from django.contrib.auth.models import Permission
permission = Permission.objects.get(
codename=perm, content_type=content_type)
user.user_permissions.add(permission)
@classmethod
def create_superuser(cls):
return get_user_model().objects.create_superuser(
@classmethod
def get_sign_up_view_url(cls):
return reverse("relate-sign_up")
@classmethod_with_client
def get_sign_up(cls, client, *, follow=True): # noqa: N805
return client.get(cls.get_sign_up_view_url(), follow=follow)
@classmethod_with_client
def post_sign_up(cls, client, data, *, follow=True): # noqa: N805
return client.post(cls.get_sign_up_view_url(), data, follow=follow)
@classmethod
def get_profile_view_url(cls):
return reverse("relate-user_profile")
@classmethod_with_client
def get_profile(cls, client, *, follow=True): # noqa: N805
return client.get(cls.get_profile_view_url(), follow=follow)
@classmethod_with_client
def post_profile(cls, client, data, *, follow=True): # noqa: N805
return client.post(cls.get_profile_view_url(), data, follow=follow)
def post_signout(cls, client, data, *, follow=True):
return client.post(cls.get_sign_up_view_url(), data, follow=follow)
@classmethod
def get_impersonate_view_url(cls):
return reverse("relate-impersonate")
@classmethod
def get_stop_impersonate_view_url(cls):
return reverse("relate-stop_impersonating")
@classmethod_with_client
def get_impersonate_view(cls, client): # noqa: N805
return client.get(cls.get_impersonate_view_url())
@classmethod_with_client
def post_impersonate_view(cls, client, # noqa: N805
impersonatee, *, follow=True):
data = {"add_impersonation_header": ["on"],
}
data["user"] = [str(impersonatee.pk)]
return client.post(cls.get_impersonate_view_url(), data, follow=follow)
@classmethod_with_client
def get_stop_impersonate(cls, client, *, follow=True): # noqa: N805
return client.get(cls.get_stop_impersonate_view_url(), follow=follow)
@classmethod_with_client
def post_stop_impersonate(cls, client, *, # noqa: N805
if not data:
data = {"stop_impersonating": ""}
return client.post(
cls.get_stop_impersonate_view_url(), data, follow=follow)
@classmethod
def get_confirm_stop_impersonate_view_url(cls):
return reverse("relate-confirm_stop_impersonating")
def get_confirm_stop_impersonate(cls, client, *, follow=True):
return client.get(
cls.get_confirm_stop_impersonate_view_url(), follow=follow)
def post_confirm_stop_impersonate(cls, client, *, follow=True):
return client.post(
cls.get_confirm_stop_impersonate_view_url(), {}, follow=follow)
@classmethod
def get_reset_password_url(cls, use_instid=False):
kwargs = {}
if use_instid:
kwargs["field"] = "instid"
return reverse("relate-reset_password", kwargs=kwargs)
@classmethod_with_client
def get_reset_password(cls, client, *, use_instid=False): # noqa: N805
return client.get(cls.get_reset_password_url(use_instid))
@classmethod_with_client
def post_reset_password(cls, client, data, *, use_instid=False): # noqa: N805
return client.post(cls.get_reset_password_url(use_instid),
data=data)
def get_reset_password_stage2_url(self, user_id, sign_in_key, **kwargs):
url = reverse("relate-reset_password_stage2", args=(user_id, sign_in_key))
querystring = kwargs.pop("querystring", None)
if querystring is not None:
assert isinstance(querystring, dict)
url += ("?%s"
% "&".join(
for (k, v) in querystring.items()]))
return url
def get_reset_password_stage2(self, user_id, sign_in_key, **kwargs):
return self.client.get(self.get_reset_password_stage2_url(
user_id=user_id, sign_in_key=sign_in_key, **kwargs))
def post_reset_password_stage2(self, user_id, sign_in_key, data, **kwargs):
return self.client.post(self.get_reset_password_stage2_url(
user_id=user_id, sign_in_key=sign_in_key, **kwargs), data=data)
@classmethod
def get_fake_time_url(cls):
@classmethod_with_client
def get_set_fake_time(cls, client): # noqa: N805
return client.get(cls.get_fake_time_url())
@classmethod_with_client
def post_set_fake_time(cls, client, data, *, follow=True): # noqa: N805
return client.post(cls.get_fake_time_url(), data, follow=follow)
def assertSessionFakeTimeEqual(self, session, expected_date_time): # noqa
fake_time_timestamp = session.get("relate_fake_time", None)
if fake_time_timestamp is None:
faked_time = None
if expected_date_time is not None:
raise AssertionError(
"the session doesn't have 'relate_fake_time' attribute")
else:
faked_time = datetime.datetime.fromtimestamp(fake_time_timestamp)
self.assertEqual(faked_time, expected_date_time)
def assertSessionFakeTimeIsNone(self, session): # noqa
self.assertSessionFakeTimeEqual(session, None)
@classmethod
def get_set_pretend_facilities_url(cls):
@classmethod_with_client
def get_set_pretend_facilities(cls, client): # noqa: N805
return client.get(cls.get_set_pretend_facilities_url())
@classmethod_with_client
def post_set_pretend_facilities(cls, client, data, *, # noqa: N805
follow=True):
return client.post(cls.get_set_pretend_facilities_url(), data,
@classmethod
def force_remove_all_course_dir(cls):
# This is only necessary for courses which are created test wise,
# not class wise.
from course.content import get_course_repo_path
for c in Course.objects.all():
force_remove_path(get_course_repo_path(c))
def assertSessionPretendFacilitiesContains(self, session, expected_facilities): # noqa
pretended = session.get("relate_pretend_facilities", None)
if expected_facilities is None:
return self.assertIsNone(pretended)
if pretended is None:
raise AssertionError(
"the session doesn't have "
"'relate_pretend_facilities' attribute")
if isinstance(expected_facilities, (list, tuple)):
self.assertTrue(set(expected_facilities).issubset(set(pretended)))
else:
self.assertTrue(expected_facilities in pretended)
def assertSessionPretendFacilitiesIsNone(self, session): # noqa
pretended = session.get("relate_pretend_facilities", None)
self.assertIsNone(pretended)
def assertFormErrorLoose(self, response, errors, form_name="form"): # noqa
"""Assert that errors is found in response.context['form'] errors"""
if errors is None:
errors = []
if not isinstance(errors, (list, tuple)):
errors = [errors]
form_errors = ". ".join(list(
itertools.chain(*response.context[form_name].errors.values())))
if form_errors is None or not form_errors:
if errors:
if form_errors:
if not errors:
self.fail("%s unexpectedly has following errors: %s"
% (form_name, repr(form_errors)))
for err in errors:
self.assertIn(err, form_errors)
# {{{ get_flow_page_ordinal_from_page_id, get_flow_page_id_from_page_ordinal
def get_flow_page_ordinal_from_page_id(flow_session_id, page_id,
with_group_id=False):
flow_page_data = FlowPageData.objects.get(
flow_session__id=flow_session_id,
page_id=page_id
)
if with_group_id:
return flow_page_data.page_ordinal, flow_page_data.group_id
return flow_page_data.page_ordinal
def get_flow_page_id_from_page_ordinal(flow_session_id, page_ordinal,
with_group_id=False):
flow_page_data = FlowPageData.objects.get(
flow_session__id=flow_session_id,
if with_group_id:
return flow_page_data.page_id, flow_page_data.group_id
return flow_page_data.page_id
# }}}
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
class _ClientUserSwitcher:
def __init__(self, client, logged_in_user, switch_to):
self.client = client
self.logged_in_user = logged_in_user
self.switch_to = switch_to
def __enter__(self):
if self.logged_in_user == self.switch_to:
return
if self.switch_to is None:
self.client.logout()
return
self.client.force_login(self.switch_to)
def __exit__(self, exc_type, exc_val, exc_tb):
if self.logged_in_user == self.switch_to:
return
if self.logged_in_user is None:
self.client.logout()
return
self.client.force_login(self.logged_in_user)
def __call__(self, func):
from functools import wraps
@wraps(func)
def wrapper(*args, **kw):
with self:
return func(*args, **kw)
return wrapper
class CoursesTestMixinBase(SuperuserCreateMixin):
# A list of Dicts, each of which contain a course dict and a list of
# participations. See SINGLE_COURSE_SETUP_LIST for the setup for one course.
courses_setup_list = []
none_participation_user_create_kwarg_list = []
courses_attributes_extra_list = None
override_settings_at_post_create_course = {}
client = Client()
client.force_login(cls.superuser)
cls.default_flow_params = None
if cls.courses_attributes_extra_list is not None:
if (len(cls.courses_attributes_extra_list)
!= len(cls.courses_setup_list)):
raise ValueError(
"'courses_attributes_extra_list' must has equal length "
"with courses")
for i, course_setup in enumerate(cls.courses_setup_list):
if "course" not in course_setup:
continue
cls.n_courses += 1
course_identifier = course_setup["course"]["identifier"]
course_setup_kwargs = course_setup["course"]
if cls.courses_attributes_extra_list:
extra_attrs = cls.courses_attributes_extra_list[i]
assert isinstance(extra_attrs, dict)
course_setup_kwargs.update(extra_attrs)
cls.create_course(client, course_setup_kwargs)
course = Course.objects.get(identifier=course_identifier)
if "participations" in course_setup:
for participation in course_setup["participations"]:
create_user_kwargs = participation.get("user")
if not create_user_kwargs:
continue
role_identifier = participation.get("role_identifier")
if not role_identifier:
continue
cls.create_participation(
course=course,
user_or_create_user_kwargs=create_user_kwargs,
role_identifier=role_identifier,
status=participation.get("status",
participation_status.active)
)
# Remove superuser from participation for further test
# such as impersonate in auth module
try:
superuser_participations = (
Participation.objects.filter(user=cls.superuser))
for sp in superuser_participations:
Participation.delete(sp)
except Participation.DoesNotExist:
pass
cls.non_participation_users = get_user_model().objects.none()
if cls.none_participation_user_create_kwarg_list:
pks = []
for create_user_kwargs in (
cls.none_participation_user_create_kwarg_list):
user = cls.create_user(create_user_kwargs)
pks.append(user.pk)
cls.non_participation_users = (
get_user_model().objects.filter(pk__in=pks))
cls.course_qset = Course.objects.all()
@classmethod
def create_user(cls, create_user_kwargs):
user, created = get_user_model().objects.get_or_create(
email__iexact=create_user_kwargs["email"], defaults=create_user_kwargs)
try:
# TODO: why pop failed here?
password = create_user_kwargs["password"]
return user
@classmethod
def create_participation(
cls, course, user_or_create_user_kwargs,
role_identifier=None, status=None):
if isinstance(user_or_create_user_kwargs, get_user_model()):
user = user_or_create_user_kwargs
else:
assert isinstance(user_or_create_user_kwargs, dict)
user = cls.create_user(user_or_create_user_kwargs)
if status is None:
status = participation_status.active
participation, p_created = Participation.objects.get_or_create(
user=user,
course=course,
status=status
)
if role_identifier is None:
role_identifier = "student"
if p_created:
role = ParticipationRole.objects.filter(
course=course, identifier=role_identifier)
participation.roles.set(role)
return participation
@classmethod_with_client
def post_create_course(cls, client, create_course_kwargs, *, # noqa: N805
raise_error=True, login_superuser=True):
# To speed up, use create_course instead, this is better used for tests
if login_superuser:
client.force_login(cls.superuser)
existing_course_count = Course.objects.count()
with override_settings(**cls.override_settings_at_post_create_course):
resp = client.post(cls.get_set_up_new_course_url(),
data=create_course_kwargs)
all_courses = Course.objects.all()
if not all_courses.count() == existing_course_count + 1:
error_string = None
# most probably the reason course creation form error
form_context = resp.context.__getitem__("form")
assert form_context
error_list = []
if form_context.errors:
error_list = [
"%s: %s"
% (field,
"\n".join(["{}:{}".format(type(e).__name__, str(e))
for e in errs]))
for field, errs
in form_context.errors.as_data().items()]
non_field_errors = form_context.non_field_errors()
if non_field_errors:
error_list.append(repr(non_field_errors))
if error_list:
error_string = "\n".join(error_list)
if not error_string:
error_string = ("course creation failed for unknown errors")
raise CourseCreateFailure(error_string)
# the course is created successfully
last_course = all_courses.last()
assert last_course
if "trusted_for_markup" in create_course_kwargs:
# This attribute is not settable via POST by the user, so set it
# manually here.
last_course.trusted_for_markup = \
create_course_kwargs["trusted_for_markup"]
last_course.save()
git_source_url_to_cache_keys(last_course.git_source))
mc.set_multi({url_cache_key: get_course_repo_path(last_course),
commit_sha_cache_key: last_course.active_git_commit_sha},
@classmethod_with_client
def create_course(cls, client, create_course_kwargs, *, # noqa: N805
raise_error=True):
has_cached_repo = False
git_source_url_to_cache_keys(create_course_kwargs["git_source"]))
try:
exist_course_repo_path = mc.get(repo_cache_key)
exist_commit_sha = mc.get(commit_sha_cache_key)
if os.path.isdir(exist_course_repo_path):
has_cached_repo = bool(exist_course_repo_path and exist_commit_sha)
else:
has_cached_repo = False
except Exception:
pass
if not has_cached_repo:
# fall back to post create
return cls.post_create_course(
client, create_course_kwargs, raise_error=raise_error)
existing_course_count = Course.objects.count()
new_course_repo_path = os.path.join(settings.GIT_ROOT,
create_course_kwargs["identifier"])
shutil.copytree(exist_course_repo_path, new_course_repo_path)
create_kwargs = deepcopy(create_course_kwargs)
create_kwargs["active_git_commit_sha"] = exist_commit_sha
Course.objects.create(**create_kwargs)
assert Course.objects.count() == existing_course_count + 1
@classmethod
def get_course_view_url(cls, view_name, course_identifier=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
return reverse(view_name, args=[course_identifier])
@classmethod
def get_course_calender_url(cls, course_identifier=None):
return cls.get_course_view_url(
"relate-view_calendar", course_identifier)
@classmethod
def get_set_up_new_course_url(cls):
return reverse("relate-set_up_new_course")
@classmethod_with_client
def get_set_up_new_course(cls, client): # noqa: N805
return client.get(cls.get_update_course_url)
def get_edit_course_url(cls, course_identifier=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
return cls.get_course_view_url("relate-edit_course", course_identifier)
@classmethod_with_client
def post_edit_course(cls, client, data, *, course=None): # noqa: N805
course = course or cls.get_default_course()
edit_course_url = cls.get_edit_course_url(course.identifier)
return client.post(edit_course_url, data)
@classmethod_with_client
def get_edit_course(cls, client, *, course=None): # noqa: N805
course = course or cls.get_default_course()
return client.get(cls.get_edit_course_url(course.identifier))
@classmethod
def get_course_page_url(cls, course_identifier=None):
return cls.get_course_view_url("relate-course_page", course_identifier)
@classmethod
def get_finish_flow_session_view_url(cls, course_identifier=None,
flow_session_id=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
if flow_session_id is None:
flow_session_id = cls.get_default_flow_session_id(course_identifier)
kwargs = {"course_identifier": course_identifier,
"flow_session_id": flow_session_id}
return reverse("relate-finish_flow_session_view", kwargs=kwargs)
@classmethod
def _get_grades_url(cls, args=None, kwargs=None):
return reverse("relate-view_participant_grades",
args=args, kwargs=kwargs)
@classmethod
def get_my_grades_url(cls, course_identifier=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
return cls._get_grades_url(args=[course_identifier])
@classmethod_with_client
def get_my_grades_view(cls, client, *, course_identifier=None): # noqa: N805
return client.get(cls.get_my_grades_url(course_identifier))
@classmethod
def get_participant_grades_url(cls, participation_id, course_identifier=None):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
return cls._get_grades_url(
kwargs={"course_identifier": course_identifier,
"participation_id": participation_id})
@classmethod_with_client
def get_participant_grades_view(
cls, client, participation_id, *, # noqa: N805
course_identifier=None, force_login_instructor=True):
course_identifier = (
course_identifier or cls.get_default_course_identifier())
if force_login_instructor:
switch_to = cls.get_default_instructor_user(course_identifier)
else:
switch_to = cls.get_logged_in_user()
with cls.temporarily_switch_to_user(switch_to):
cls.get_participant_grades_url(participation_id, course_identifier))
@classmethod
def get_gradebook_url_by_opp_id(cls, opp_id, course_identifier=None):