Skip to content
base_test_mixins.py 110 KiB
Newer Older
from __future__ import annotations


Dong Zhuang's avatar
Dong Zhuang committed
__copyright__ = "Copyright (C) 2017 Dong Zhuang, Andreas Kloeckner, Zesheng Wang"

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

Andreas Klöckner's avatar
Andreas Klöckner committed
import datetime
import hashlib
Dong Zhuang's avatar
Dong Zhuang committed
import os
Andreas Klöckner's avatar
Andreas Klöckner committed
import re
Andreas Klöckner's avatar
Andreas Klöckner committed
import sys
import tempfile
from collections import OrderedDict
from copy import deepcopy
from functools import partial
Andreas Klöckner's avatar
Andreas Klöckner committed
from types import MethodType
import memcache
from django.conf import settings
Dong Zhuang's avatar
Dong Zhuang committed
from django.contrib.auth import get_user_model
from django.core.exceptions import ImproperlyConfigured
Andreas Klöckner's avatar
Andreas Klöckner committed
from django.test import Client, RequestFactory, override_settings
from django.urls import resolve, reverse
Dong Zhuang's avatar
Dong Zhuang committed
from course.constants import (
    flow_permission as fperm,
    grade_aggregation_strategy as g_strategy,
    participation_status,
    user_status,
Andreas Klöckner's avatar
Andreas Klöckner committed
)
from course.content import get_course_repo_path, get_repo_blob
Andreas Klöckner's avatar
Andreas Klöckner committed
from course.flow import GradeInfo
from course.models import (
    Course,
    FlowPageData,
    FlowPageVisit,
    FlowSession,
    GradingOpportunity,
    Participation,
    ParticipationRole,
Andreas Klöckner's avatar
Andreas Klöckner committed
)
from tests.constants import (
    COMMIT_SHA_MAP,
    FAKED_YAML_PATH,
    QUIZ_FLOW_ID,
    TEST_PAGE_TUPLE,
Andreas Klöckner's avatar
Andreas Klöckner committed
)
Josh Asplund's avatar
Josh Asplund committed
from tests.utils import mock
Dong Zhuang's avatar
Dong Zhuang committed

Andreas Klöckner's avatar
Andreas Klöckner committed

Dong Zhuang's avatar
Dong Zhuang committed
CORRECTNESS_ATOL = 1e-05
Dong Zhuang's avatar
Dong Zhuang committed

CREATE_SUPERUSER_KWARGS = {
    "username": "test_admin",
    "password": "test_admin",
    "email": "test_admin@example.com",
    "first_name": "Test",
    "last_name": "Admin"}

SINGLE_COURSE_SETUP_LIST = [
    {
        "course": {
            "identifier": "test-course",
            "name": "Test Course",
            "number": "CS123",
            "time_period": "Fall 2016",
            "hidden": False,
            "listed": True,
            "accepts_enrollment": True,
            "git_source": "https://github.com/inducer/relate-sample.git",
Dong Zhuang's avatar
Dong Zhuang committed
            "course_file": "course.yml",
            "events_file": "events.yml",
            "enrollment_approval_required": False,
            "enrollment_required_email_suffix": "",
            "preapproval_require_verified_inst_id": True,
Dong Zhuang's avatar
Dong Zhuang committed
            "from_email": "inform@tiker.net",
            "notify_email": "inform@tiker.net",
            },
Dong Zhuang's avatar
Dong Zhuang committed
        "participations": [
            {
                "role_identifier": "instructor",
                "user": {
                    "username": "test_instructor",
                    "password": "test_instructor",
                    "email": "test_instructor@example.com",
                    "first_name": "Test_ins",
Dong Zhuang's avatar
Dong Zhuang committed
                    "last_name": "Instructor"},
                "status": participation_status.active
            },
            {
                "role_identifier": "ta",
                "user": {
                    "username": "test_ta",
                    "password": "test",
                    "email": "test_ta@example.com",
                    "first_name": "Test_ta",
Dong Zhuang's avatar
Dong Zhuang committed
                    "last_name": "TA"},
                "status": participation_status.active
            },
            {
                "role_identifier": "student",
                "user": {
                    "username": "test_student",
                    "password": "test",
                    "email": "test_student@example.com",
                    "first_name": "Test_stu",
Dong Zhuang's avatar
Dong Zhuang committed
                    "last_name": "Student"},
                "status": participation_status.active
            }
TWO_COURSE_SETUP_LIST = deepcopy(SINGLE_COURSE_SETUP_LIST)
TWO_COURSE_SETUP_LIST[0]["course"]["identifier"] = "test-course1"
TWO_COURSE_SETUP_LIST += deepcopy(SINGLE_COURSE_SETUP_LIST)
TWO_COURSE_SETUP_LIST[1]["course"]["identifier"] = "test-course2"

NONE_PARTICIPATION_USER_CREATE_KWARG_LIST = [
    {
        "username": "test_user1",
        "password": "test_user1",
        "email": "test_user1@suffix.com",
        "first_name": "Test",
        "last_name": "User1",
        "institutional_id": "test_user1_institutional_id",
        "institutional_id_verified": True,
        "status": user_status.active
    },
    {
        "username": "test_user2",
        "password": "test_user2",
        "email": "test_user2@nosuffix.com",
        "first_name": "Test",
        "last_name": "User2",
        "institutional_id": "test_user2_institutional_id",
        "institutional_id_verified": False,
        "status": user_status.active
    },
    {
        "username": "test_user3",
        "password": "test_user3",
        "email": "test_user3@suffix.com",
        "first_name": "Test",
        "last_name": "User3",
        "institutional_id": "test_user3_institutional_id",
        "institutional_id_verified": True,
        "status": user_status.unconfirmed
    },
    {
        "username": "test_user4",
        "password": "test_user4",
        "email": "test_user4@no_suffix.com",
        "first_name": "Test",
        "last_name": "User4",
        "institutional_id": "test_user4_institutional_id",
        "institutional_id_verified": False,
        "status": user_status.unconfirmed
    mc = memcache.Client(["127.0.0.1:11211"])
SELECT2_HTML_FIELD_ID_SEARCH_PATTERN = re.compile(r'data-field_id="([^"]+)"')

def git_source_url_to_cache_keys(url):
    url_hash = hashlib.md5(url.encode("utf-8")).hexdigest()
    return (
        "test_course:%s" % url_hash,
        "test_sha:%s" % url_hash
    )

Dong Zhuang's avatar
Dong Zhuang committed

class CourseCreateFailure(Exception):
    pass


class classmethod_with_client:  # noqa: N801
    """This acts like Python's built-in ``classmethod``, with one change:
    When called on an instance (i.e. not a class), it automatically supplies
    ``self.client`` as the first argument.

    .. note::

        This isn't immensely logical, but it helped avoid an expensive
        refactor of the test code to explicitly always pass the client.
        (The prior state was much worse: a class-global client was being
        used. Almost fortunately, Django 3.2 broke this usage.)
    """

    def __init__(self, f):
        self.f = f

    def __get__(self, obj, cls=None):
        if obj is None:
            return MethodType(self.f, cls)
        else:
            return partial(MethodType(self.f, type(obj)), obj.client)


# {{{ ResponseContextMixin

class ResponseContextMixin:
Dong Zhuang's avatar
Dong Zhuang committed
    """
    Response context refers to "the template Context instance that was used
    to render the template that produced the response content".
    Ref: https://docs.djangoproject.com/en/dev/topics/testing/tools/#django.test.Response.context
Dong Zhuang's avatar
Dong Zhuang committed
    """
    def get_response_context_value_by_name(self, response, context_name):
Dong Zhuang's avatar
Dong Zhuang committed
        try:
            value = response.context[context_name]
        except KeyError:
            self.fail("%s does not exist in given response" % context_name)
        else:
            return value

    def assertResponseHasNoContext(self, response, context_name):  # noqa
        has_context = True
        try:
            response.context[context_name]
        except KeyError:
            has_context = False
        if has_context:
            self.fail("%s unexpectedly exist in given response" % context_name)
Dong Zhuang's avatar
Dong Zhuang committed

    def assertResponseContextIsNone(self, resp, context_name):  # noqa
        try:
            value = self.get_response_context_value_by_name(resp, context_name)
        except AssertionError:
            # the context item doesn't exist
            pass
        else:
            self.assertIsNone(value)

    def assertResponseContextIsNotNone(self, resp, context_name, msg=""):  # noqa
Dong Zhuang's avatar
Dong Zhuang committed
        value = self.get_response_context_value_by_name(resp, context_name)
        self.assertIsNotNone(value, msg)
Dong Zhuang's avatar
Dong Zhuang committed

    def assertResponseContextEqual(self, resp, context_name, expected_value):  # noqa
        value = self.get_response_context_value_by_name(resp, context_name)
Dong Zhuang's avatar
Dong Zhuang committed
        try:
            self.assertTrue(float(value) - float(expected_value) <= 1e-04)
            return
        except Exception:
            self.assertEqual(value, expected_value)
Dong Zhuang's avatar
Dong Zhuang committed

    def assertResponseContextContains(self, resp,  # noqa
                                      context_name, expected_value, html=False,
                                      in_bulk=False):
Dong Zhuang's avatar
Dong Zhuang committed
        value = self.get_response_context_value_by_name(resp, context_name)
        if in_bulk:
            if not isinstance(expected_value, list):
                expected_value = [expected_value]

            for v in expected_value:
                if not html:
                    self.assertIn(v, value)
                else:
                    self.assertInHTML(v, value)
Dong Zhuang's avatar
Dong Zhuang committed
        else:
            if not html:
                self.assertIn(expected_value, value)
            else:
                self.assertInHTML(expected_value, value)
Dong Zhuang's avatar
Dong Zhuang committed

    def assertResponseContextRegex(  # noqa
            self, resp,
Dong Zhuang's avatar
Dong Zhuang committed
            context_name, expected_value_regex):
        value = self.get_response_context_value_by_name(resp, context_name)
        self.assertRegex(value, expected_value_regex)
Dong Zhuang's avatar
Dong Zhuang committed

    def get_response_context_answer_feedback(self, response):
        return self.get_response_context_value_by_name(response, "feedback")

    def get_response_context_answer_feedback_string(self, response,
                                             include_bulk_feedback=True):
        answer_feedback = self.get_response_context_value_by_name(
            response, "feedback")

        self.assertTrue(hasattr(answer_feedback, "feedback"))
        if not include_bulk_feedback:
            return answer_feedback.feedback

        if answer_feedback.bulk_feedback is None:
            return answer_feedback.feedback
        else:
            if answer_feedback.feedback is None:
                return answer_feedback.bulk_feedback
            return answer_feedback.feedback + answer_feedback.bulk_feedback

    def assertResponseContextAnswerFeedbackContainsFeedback(  # noqa
Dong Zhuang's avatar
Dong Zhuang committed
            self, response, expected_feedback,
            include_bulk_feedback=True, html=False):
        feedback_str = self.get_response_context_answer_feedback_string(
            response, include_bulk_feedback)
        if not html:
            self.assertIn(expected_feedback, feedback_str)
        else:
            self.assertInHTML(expected_feedback, feedback_str)
    def assertResponseContextAnswerFeedbackNotContainsFeedback(  # noqa
            self, response, expected_feedback,
            include_bulk_feedback=True,
            html=False):
        feedback_str = self.get_response_context_answer_feedback_string(
            response, include_bulk_feedback)

        if not html:
            self.assertNotIn(expected_feedback, feedback_str)
            self.assertInHTML(expected_feedback, feedback_str, count=0)
    def assertResponseContextAnswerFeedbackCorrectnessEquals(  # noqa
                                        self, response, expected_correctness):
        answer_feedback = self.get_response_context_answer_feedback(response)
        if expected_correctness is None:
            try:
                self.assertTrue(hasattr(answer_feedback, "correctness"))
            except AssertionError:
                pass
            else:
                self.assertIsNone(answer_feedback.correctness)
        else:
Dong Zhuang's avatar
Dong Zhuang committed
            if answer_feedback.correctness is None:
                return self.fail("The returned correctness is None, not %s"
                          % expected_correctness)
            self.assertTrue(
                abs(float(answer_feedback.correctness)
Dong Zhuang's avatar
Dong Zhuang committed
                    - float(str(expected_correctness))) < CORRECTNESS_ATOL,
                "%s does not equal %s"
                % (str(answer_feedback.correctness)[:5],
                   str(expected_correctness)[:5]))

    def get_response_body(self, response):
        return self.get_response_context_value_by_name(response, "body")

    def get_page_response_correct_answer(self, response):
        return self.get_response_context_value_by_name(response, "correct_answer")

    def get_page_response_feedback(self, response):
        return self.get_response_context_value_by_name(response, "feedback")

Dong Zhuang's avatar
Dong Zhuang committed
    def debug_print_response_context_value(self, resp, context_name):
        try:
            value = self.get_response_context_value_by_name(resp, context_name)
            print("\n-----------context %s-------------"
                  % context_name)
            if isinstance(value, list | tuple):
Dong Zhuang's avatar
Dong Zhuang committed
                from course.validation import ValidationWarning
                for v in value:
                    if isinstance(v, ValidationWarning):
                        print(v.text)
                    else:
                        print(repr(v))
            else:
                print(value)
            print("-----------context end-------------\n")
        except AssertionError:
            print("\n-------no value for context %s----------" % context_name)

    def get_select2_field_id_from_response(self, response,
                                           form_context_name="form"):
        self.assertResponseContextIsNotNone(
            response, form_context_name,
            "The response doesn't contain a context named '%s'"
            % form_context_name)
        form_str = str(response.context[form_context_name])
        m = SELECT2_HTML_FIELD_ID_SEARCH_PATTERN.search(form_str)
        assert m, "pattern not found in %s" % form_str
        return m.group(1)

    def select2_get_request(self, field_id, term=None,
                            select2_urlname="django_select2:auto-json"):

        select2_url = reverse(select2_urlname)
        params = {"field_id": field_id}
        if term is not None:
            assert isinstance(term, str)
            term = term.strip()
            if term:
                params["term"] = term

        return self.client.get(select2_url, params,
                          HTTP_X_REQUESTED_WITH="XMLHttpRequest")

    def get_select2_response_data(self, response, key="results"):
        import json
        return json.loads(response.content.decode("utf-8"))[key]
Dong Zhuang's avatar
Dong Zhuang committed

class SuperuserCreateMixin(ResponseContextMixin):
Dong Zhuang's avatar
Dong Zhuang committed
    create_superuser_kwargs = CREATE_SUPERUSER_KWARGS

    @classmethod
    def setUpTestData(cls):
Dong Zhuang's avatar
Dong Zhuang committed
        # Create superuser, without this, we cannot
        # create user, course and participation.
        cls.superuser = cls.create_superuser()
        cls.settings_git_root_override = (
            override_settings(GIT_ROOT=tempfile.mkdtemp()))
        cls.settings_git_root_override.enable()
        super().setUpTestData()
Dong Zhuang's avatar
Dong Zhuang committed

Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def add_user_permission(cls, user, perm, model=Course):
        from django.contrib.contenttypes.models import ContentType
        content_type = ContentType.objects.get_for_model(model)
        from django.contrib.auth.models import Permission
        permission = Permission.objects.get(
            codename=perm, content_type=content_type)
        user.user_permissions.add(permission)

Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def create_superuser(cls):
        return get_user_model().objects.create_superuser(
Dong Zhuang's avatar
Dong Zhuang committed
            **cls.create_superuser_kwargs)
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def get_sign_up_view_url(cls):
    @classmethod_with_client
    def get_sign_up(cls, client, *, follow=True):  # noqa: N805
        return client.get(cls.get_sign_up_view_url(), follow=follow)
    @classmethod_with_client
    def post_sign_up(cls, client, data, *, follow=True):  # noqa: N805
        return client.post(cls.get_sign_up_view_url(), data, follow=follow)
    @classmethod
    def get_profile_view_url(cls):
        return reverse("relate-user_profile")

    @classmethod_with_client
    def get_profile(cls, client, *, follow=True):  # noqa: N805
        return client.get(cls.get_profile_view_url(), follow=follow)
    @classmethod_with_client
    def post_profile(cls, client, data, *, follow=True):  # noqa: N805
Dong Zhuang's avatar
Dong Zhuang committed
        data.update({"submit_user": [""]})
        return client.post(cls.get_profile_view_url(), data, follow=follow)
    def post_signout(cls, client, data, *, follow=True):
        return client.post(cls.get_sign_up_view_url(), data, follow=follow)
    @classmethod
    def get_impersonate_view_url(cls):
        return reverse("relate-impersonate")

    @classmethod
    def get_stop_impersonate_view_url(cls):
        return reverse("relate-stop_impersonating")

    @classmethod_with_client
    def get_impersonate_view(cls, client):  # noqa: N805
        return client.get(cls.get_impersonate_view_url())
    @classmethod_with_client
    def post_impersonate_view(cls, client,  # noqa: N805
            impersonatee, *, follow=True):
        data = {"add_impersonation_header": ["on"],
                }
        data["user"] = [str(impersonatee.pk)]
        return client.post(cls.get_impersonate_view_url(), data, follow=follow)
    @classmethod_with_client
    def get_stop_impersonate(cls, client, *, follow=True):  # noqa: N805
        return client.get(cls.get_stop_impersonate_view_url(), follow=follow)
    @classmethod_with_client
    def post_stop_impersonate(cls, client, *,  # noqa: N805
Andreas Klöckner's avatar
Andreas Klöckner committed
            data=None, follow=True):
        if not data:
            data = {"stop_impersonating": ""}
            cls.get_stop_impersonate_view_url(), data, follow=follow)
    @classmethod
    def get_confirm_stop_impersonate_view_url(cls):
Dong Zhuang's avatar
Dong Zhuang committed
        return reverse("relate-confirm_stop_impersonating")

    def get_confirm_stop_impersonate(cls, client, *, follow=True):
        return client.get(
            cls.get_confirm_stop_impersonate_view_url(), follow=follow)
    def post_confirm_stop_impersonate(cls, client, *, follow=True):
        return client.post(
            cls.get_confirm_stop_impersonate_view_url(), {}, follow=follow)
Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def get_reset_password_url(cls, use_instid=False):
        kwargs = {}
        if use_instid:
            kwargs["field"] = "instid"
        return reverse("relate-reset_password", kwargs=kwargs)

    @classmethod_with_client
    def get_reset_password(cls, client, *, use_instid=False):  # noqa: N805
        return client.get(cls.get_reset_password_url(use_instid))
    @classmethod_with_client
    def post_reset_password(cls, client, data, *, use_instid=False):  # noqa: N805
        return client.post(cls.get_reset_password_url(use_instid),
Dong Zhuang's avatar
Dong Zhuang committed
                          data=data)

    def get_reset_password_stage2_url(self, user_id, sign_in_key, **kwargs):
        url = reverse("relate-reset_password_stage2", args=(user_id, sign_in_key))
        querystring = kwargs.pop("querystring", None)
        if querystring is not None:
            assert isinstance(querystring, dict)
            url += ("?%s"
                    % "&".join(
                         for (k, v) in querystring.items()]))
Dong Zhuang's avatar
Dong Zhuang committed
        return url

    def get_reset_password_stage2(self, user_id, sign_in_key, **kwargs):
        return self.client.get(self.get_reset_password_stage2_url(
Dong Zhuang's avatar
Dong Zhuang committed
            user_id=user_id, sign_in_key=sign_in_key, **kwargs))

    def post_reset_password_stage2(self, user_id, sign_in_key, data, **kwargs):
        return self.client.post(self.get_reset_password_stage2_url(
Dong Zhuang's avatar
Dong Zhuang committed
            user_id=user_id, sign_in_key=sign_in_key, **kwargs), data=data)
    @classmethod
    def get_fake_time_url(cls):
Dong Zhuang's avatar
Dong Zhuang committed
        return reverse("relate-set_fake_time")

    @classmethod_with_client
    def get_set_fake_time(cls, client):  # noqa: N805
        return client.get(cls.get_fake_time_url())
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod_with_client
    def post_set_fake_time(cls, client, data, *, follow=True):  # noqa: N805
        return client.post(cls.get_fake_time_url(), data, follow=follow)
Dong Zhuang's avatar
Dong Zhuang committed

    def assertSessionFakeTimeEqual(self, session, expected_date_time):  # noqa
        fake_time_timestamp = session.get("relate_fake_time", None)
        if fake_time_timestamp is None:
            faked_time = None
            if expected_date_time is not None:
                raise AssertionError(
                    "the session doesn't have 'relate_fake_time' attribute")
        else:
            faked_time = datetime.datetime.fromtimestamp(fake_time_timestamp)
        self.assertEqual(faked_time, expected_date_time)

    def assertSessionFakeTimeIsNone(self, session):  # noqa
        self.assertSessionFakeTimeEqual(session, None)

    @classmethod
    def get_set_pretend_facilities_url(cls):
Dong Zhuang's avatar
Dong Zhuang committed
        return reverse("relate-set_pretend_facilities")

    @classmethod_with_client
    def get_set_pretend_facilities(cls, client):  # noqa: N805
        return client.get(cls.get_set_pretend_facilities_url())
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod_with_client
    def post_set_pretend_facilities(cls, client, data, *,  # noqa: N805
            follow=True):
        return client.post(cls.get_set_pretend_facilities_url(), data,
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def force_remove_all_course_dir(cls):
        # This is only necessary for courses which are created test wise,
        # not class wise.
        from course.content import get_course_repo_path
Andreas Klöckner's avatar
Andreas Klöckner committed
        from relate.utils import force_remove_path
        for c in Course.objects.all():
            force_remove_path(get_course_repo_path(c))

Dong Zhuang's avatar
Dong Zhuang committed
    def assertSessionPretendFacilitiesContains(self, session, expected_facilities):  # noqa
        pretended = session.get("relate_pretend_facilities", None)
        if expected_facilities is None:
            return self.assertIsNone(pretended)
        if pretended is None:
            raise AssertionError(
                "the session doesn't have "
                "'relate_pretend_facilities' attribute")

        if isinstance(expected_facilities, list | tuple):
Dong Zhuang's avatar
Dong Zhuang committed
            self.assertTrue(set(expected_facilities).issubset(set(pretended)))
        else:
            self.assertTrue(expected_facilities in pretended)

    def assertSessionPretendFacilitiesIsNone(self, session):  # noqa
        pretended = session.get("relate_pretend_facilities", None)
        self.assertIsNone(pretended)

    def assertFormErrorLoose(self, response, errors, form_name="form"):  # noqa
        """Assert that errors is found in response.context['form'] errors"""
        import itertools
        if errors is None:
            errors = []
        if not isinstance(errors, list | tuple):
            errors = [errors]
Dong Zhuang's avatar
Dong Zhuang committed
        try:
            form_errors = ". ".join(list(
                itertools.chain(*response.context[form_name].errors.values())))
Dong Zhuang's avatar
Dong Zhuang committed
        except TypeError:
            form_errors = None

        if form_errors is None or not form_errors:
            if errors:
Dong Zhuang's avatar
Dong Zhuang committed
                self.fail("%s has no error" % form_name)
Dong Zhuang's avatar
Dong Zhuang committed

        if form_errors:
            if not errors:
                self.fail("%s unexpectedly has following errors: %s"
                          % (form_name, repr(form_errors)))

        for err in errors:
            self.assertIn(err, form_errors)
Dong Zhuang's avatar
Dong Zhuang committed

# {{{ get_flow_page_ordinal_from_page_id, get_flow_page_id_from_page_ordinal
Dong Zhuang's avatar
Dong Zhuang committed
def get_flow_page_ordinal_from_page_id(flow_session_id, page_id,
                                       with_group_id=False):
    flow_page_data = FlowPageData.objects.get(
        flow_session__id=flow_session_id,
        page_id=page_id
    )
Dong Zhuang's avatar
Dong Zhuang committed
    if with_group_id:
        return flow_page_data.page_ordinal, flow_page_data.group_id
    return flow_page_data.page_ordinal
def get_flow_page_id_from_page_ordinal(flow_session_id, page_ordinal,
                                       with_group_id=False):
    flow_page_data = FlowPageData.objects.get(
        flow_session__id=flow_session_id,
        page_ordinal=page_ordinal
    if with_group_id:
        return flow_page_data.page_id, flow_page_data.group_id
# {{{ CoursesTestMixinBase

class _ClientUserSwitcher:
    def __init__(self, client, logged_in_user, switch_to):
        self.client = client
        self.logged_in_user = logged_in_user
        self.switch_to = switch_to

    def __enter__(self):
        if self.logged_in_user == self.switch_to:
            return
        if self.switch_to is None:
            self.client.logout()
            return
        self.client.force_login(self.switch_to)

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.logged_in_user == self.switch_to:
            return
        if self.logged_in_user is None:
            self.client.logout()
            return
        self.client.force_login(self.logged_in_user)

    def __call__(self, func):
        from functools import wraps

        @wraps(func)
        def wrapper(*args, **kw):
            with self:
                return func(*args, **kw)
        return wrapper


Dong Zhuang's avatar
Dong Zhuang committed
class CoursesTestMixinBase(SuperuserCreateMixin):

    # A list of Dicts, each of which contain a course dict and a list of
    # participations. See SINGLE_COURSE_SETUP_LIST for the setup for one course.
    courses_setup_list = []
    none_participation_user_create_kwarg_list = []
    courses_attributes_extra_list = None
    override_settings_at_post_create_course = {}
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def setUpTestData(cls):
        super().setUpTestData()

        client = Client()
        client.force_login(cls.superuser)
        cls.default_flow_params = None
Dong Zhuang's avatar
Dong Zhuang committed
        cls.n_courses = 0
        if cls.courses_attributes_extra_list is not None:
            if (len(cls.courses_attributes_extra_list)
                    != len(cls.courses_setup_list)):
                raise ValueError(
                    "'courses_attributes_extra_list' must has equal length "
                    "with courses")

        for i, course_setup in enumerate(cls.courses_setup_list):
Dong Zhuang's avatar
Dong Zhuang committed
            if "course" not in course_setup:
                continue

            cls.n_courses += 1
            course_identifier = course_setup["course"]["identifier"]
            course_setup_kwargs = course_setup["course"]
            if cls.courses_attributes_extra_list:
                extra_attrs = cls.courses_attributes_extra_list[i]
                assert isinstance(extra_attrs, dict)
                course_setup_kwargs.update(extra_attrs)
            cls.create_course(client, course_setup_kwargs)
Dong Zhuang's avatar
Dong Zhuang committed
            course = Course.objects.get(identifier=course_identifier)
            if "participations" in course_setup:
                for participation in course_setup["participations"]:
                    create_user_kwargs = participation.get("user")
                    if not create_user_kwargs:
                        continue
                    role_identifier = participation.get("role_identifier")
                    if not role_identifier:
                        continue
                    cls.create_participation(
                        course=course,
                        user_or_create_user_kwargs=create_user_kwargs,
Dong Zhuang's avatar
Dong Zhuang committed
                        role_identifier=role_identifier,
                        status=participation.get("status",
                                                 participation_status.active)
                    )

            # Remove superuser from participation for further test
            # such as impersonate in auth module
            try:
                superuser_participations = (
                    Participation.objects.filter(user=cls.superuser))
                for sp in superuser_participations:
                    Participation.delete(sp)
            except Participation.DoesNotExist:
                pass

            cls.non_participation_users = get_user_model().objects.none()
            if cls.none_participation_user_create_kwarg_list:
                pks = []
                for create_user_kwargs in (
                        cls.none_participation_user_create_kwarg_list):
                    user = cls.create_user(create_user_kwargs)
                    pks.append(user.pk)
                cls.non_participation_users = (
                    get_user_model().objects.filter(pk__in=pks))

Dong Zhuang's avatar
Dong Zhuang committed
        cls.course_qset = Course.objects.all()

    @classmethod
    def create_user(cls, create_user_kwargs):
        user, created = get_user_model().objects.get_or_create(
            email__iexact=create_user_kwargs["email"], defaults=create_user_kwargs)
Dong Zhuang's avatar
Dong Zhuang committed
        if created:
            try:
                # TODO: why pop failed here?
                password = create_user_kwargs["password"]
Andreas Klöckner's avatar
Andreas Klöckner committed
            except Exception:
Dong Zhuang's avatar
Dong Zhuang committed
            user.set_password(password)
            user.save()
        return user

    @classmethod
    def create_participation(
            cls, course, user_or_create_user_kwargs,
            role_identifier=None, status=None):
        if isinstance(user_or_create_user_kwargs, get_user_model()):
            user = user_or_create_user_kwargs
        else:
            assert isinstance(user_or_create_user_kwargs, dict)
            user = cls.create_user(user_or_create_user_kwargs)
        if status is None:
            status = participation_status.active
Dong Zhuang's avatar
Dong Zhuang committed
        participation, p_created = Participation.objects.get_or_create(
            user=user,
            course=course,
            status=status
        )
        if role_identifier is None:
            role_identifier = "student"
Dong Zhuang's avatar
Dong Zhuang committed
        if p_created:
            role = ParticipationRole.objects.filter(
                course=course, identifier=role_identifier)
            participation.roles.set(role)
        return participation

    @classmethod_with_client
    def post_create_course(cls, client, create_course_kwargs, *,  # noqa: N805
            raise_error=True, login_superuser=True):
        # To speed up, use create_course instead, this is better used for tests
            client.force_login(cls.superuser)
        existing_course_count = Course.objects.count()
        with override_settings(**cls.override_settings_at_post_create_course):
            resp = client.post(cls.get_set_up_new_course_url(),
        if raise_error:
            all_courses = Course.objects.all()
            if not all_courses.count() == existing_course_count + 1:
                error_string = None
                # most probably the reason course creation form error
                form_context = resp.context.__getitem__("form")
                assert form_context
                error_list = []
                if form_context.errors:
                    error_list = [
                        "%s: %s"
                        % (field,
                           "\n".join(["{}:{}".format(type(e).__name__, str(e))
                                      for e in errs]))
                        for field, errs
                        in form_context.errors.as_data().items()]
                non_field_errors = form_context.non_field_errors()
                if non_field_errors:
                    error_list.append(repr(non_field_errors))
                if error_list:
                    error_string = "\n".join(error_list)
                if not error_string:
                    error_string = ("course creation failed for unknown errors")
                raise CourseCreateFailure(error_string)
            # the course is created successfully
            last_course = all_courses.last()
            assert last_course
            if "trusted_for_markup" in create_course_kwargs:
                # This attribute is not settable via POST by the user, so set it
                # manually here.
                last_course.trusted_for_markup = \
                        create_course_kwargs["trusted_for_markup"]
                last_course.save()

            url_cache_key, commit_sha_cache_key = (
                git_source_url_to_cache_keys(last_course.git_source))
            mc.set_multi({url_cache_key: get_course_repo_path(last_course),
                          commit_sha_cache_key: last_course.active_git_commit_sha},
Dong Zhuang's avatar
Dong Zhuang committed
                         time=120000
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod_with_client
    def create_course(cls, client, create_course_kwargs, *,  # noqa: N805
            raise_error=True):
        repo_cache_key, commit_sha_cache_key = (
            git_source_url_to_cache_keys(create_course_kwargs["git_source"]))
        try:
            exist_course_repo_path = mc.get(repo_cache_key)
            exist_commit_sha = mc.get(commit_sha_cache_key)
            if os.path.isdir(exist_course_repo_path):
                has_cached_repo = bool(exist_course_repo_path and exist_commit_sha)
            else:
                has_cached_repo = False
        except Exception:
            pass

        if not has_cached_repo:
            # fall back to post create
            return cls.post_create_course(
                    client, create_course_kwargs, raise_error=raise_error)
        existing_course_count = Course.objects.count()
        new_course_repo_path = os.path.join(settings.GIT_ROOT,
                                        create_course_kwargs["identifier"])
        shutil.copytree(exist_course_repo_path, new_course_repo_path)
        create_kwargs = deepcopy(create_course_kwargs)
        create_kwargs["active_git_commit_sha"] = exist_commit_sha
        Course.objects.create(**create_kwargs)
        assert Course.objects.count() == existing_course_count + 1

    @classmethod
    def get_course_view_url(cls, view_name, course_identifier=None):
        course_identifier = (
            course_identifier or cls.get_default_course_identifier())
        return reverse(view_name, args=[course_identifier])

    @classmethod
    def get_course_calender_url(cls, course_identifier=None):
        return cls.get_course_view_url(
            "relate-view_calendar", course_identifier)

    @classmethod
    def get_set_up_new_course_url(cls):
        return reverse("relate-set_up_new_course")

    @classmethod_with_client
    def get_set_up_new_course(cls, client):  # noqa: N805
        return client.get(cls.get_update_course_url)
    @classmethod
    def get_edit_course_url(cls, course_identifier=None):
Dong Zhuang's avatar
Dong Zhuang committed
        course_identifier = (
            course_identifier or cls.get_default_course_identifier())
        return cls.get_course_view_url("relate-edit_course", course_identifier)

    @classmethod_with_client
    def post_edit_course(cls, client, data, *, course=None):  # noqa: N805
        course = course or cls.get_default_course()
        edit_course_url = cls.get_edit_course_url(course.identifier)
        return client.post(edit_course_url, data)
    @classmethod_with_client
    def get_edit_course(cls, client, *, course=None):  # noqa: N805
        course = course or cls.get_default_course()
        return client.get(cls.get_edit_course_url(course.identifier))
    def get_course_page_url(cls, course_identifier=None):
        return cls.get_course_view_url("relate-course_page", course_identifier)
Dong Zhuang's avatar
Dong Zhuang committed

Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def get_finish_flow_session_view_url(cls, course_identifier=None,
                                         flow_session_id=None):
        course_identifier = (
            course_identifier or cls.get_default_course_identifier())
        if flow_session_id is None:
            flow_session_id = cls.get_default_flow_session_id(course_identifier)

        kwargs = {"course_identifier": course_identifier,
                  "flow_session_id": flow_session_id}
        return reverse("relate-finish_flow_session_view", kwargs=kwargs)

Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def _get_grades_url(cls, args=None, kwargs=None):
        return reverse("relate-view_participant_grades",
                       args=args, kwargs=kwargs)

    @classmethod
    def get_my_grades_url(cls, course_identifier=None):
        course_identifier = (
            course_identifier or cls.get_default_course_identifier())
        return cls._get_grades_url(args=[course_identifier])

    @classmethod_with_client
    def get_my_grades_view(cls, client, *, course_identifier=None):  # noqa: N805
        return client.get(cls.get_my_grades_url(course_identifier))
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def get_participant_grades_url(cls, participation_id, course_identifier=None):
        course_identifier = (
            course_identifier or cls.get_default_course_identifier())
        return cls._get_grades_url(
            kwargs={"course_identifier": course_identifier,
                    "participation_id": participation_id})

    @classmethod_with_client
    def get_participant_grades_view(
            cls, client, participation_id, *,  # noqa: N805
            course_identifier=None, force_login_instructor=True):
Dong Zhuang's avatar
Dong Zhuang committed
        course_identifier = (
            course_identifier or cls.get_default_course_identifier())
        if force_login_instructor:
            switch_to = cls.get_default_instructor_user(course_identifier)
        else:
            switch_to = cls.get_logged_in_user()

        with cls.temporarily_switch_to_user(switch_to):
            return client.get(
Dong Zhuang's avatar
Dong Zhuang committed
                cls.get_participant_grades_url(participation_id, course_identifier))