Skip to content
base_test_mixins.py 66.8 KiB
Newer Older
Dong Zhuang's avatar
Dong Zhuang committed
from __future__ import division

__copyright__ = "Copyright (C) 2017 Dong Zhuang, Andreas Kloeckner, Zesheng Wang"

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

Dong Zhuang's avatar
Dong Zhuang committed
import six
import tempfile
Dong Zhuang's avatar
Dong Zhuang committed
import os
import shutil
import hashlib
Dong Zhuang's avatar
Dong Zhuang committed
import datetime
from copy import deepcopy
Dong Zhuang's avatar
Dong Zhuang committed
from django.test import Client, override_settings
Dong Zhuang's avatar
Dong Zhuang committed
from django.urls import reverse, resolve
from django.conf import settings
Dong Zhuang's avatar
Dong Zhuang committed
from django.contrib.auth import get_user_model
from django.core.exceptions import ImproperlyConfigured

Dong Zhuang's avatar
Dong Zhuang committed
from tests.utils import mock
Dong Zhuang's avatar
Dong Zhuang committed
from course.models import (
    Course, Participation, ParticipationRole, FlowSession, FlowPageData,
    FlowPageVisit)
from course.constants import participation_status, user_status
from course.content import get_course_repo_path

ATOL = 1e-05
Dong Zhuang's avatar
Dong Zhuang committed

CREATE_SUPERUSER_KWARGS = {
    "username": "test_admin",
    "password": "test_admin",
    "email": "test_admin@example.com",
    "first_name": "Test",
    "last_name": "Admin"}

SINGLE_COURSE_SETUP_LIST = [
    {
        "course": {
            "identifier": "test-course",
            "name": "Test Course",
            "number": "CS123",
            "time_period": "Fall 2016",
            "hidden": False,
            "listed": True,
            "accepts_enrollment": True,
            "git_source": "git://github.com/inducer/relate-sample",
            "course_file": "course.yml",
            "events_file": "events.yml",
            "enrollment_approval_required": False,
            "enrollment_required_email_suffix": "",
            "preapproval_require_verified_inst_id": True,
Dong Zhuang's avatar
Dong Zhuang committed
            "from_email": "inform@tiker.net",
            "notify_email": "inform@tiker.net"},
        "participations": [
            {
                "role_identifier": "instructor",
                "user": {
                    "username": "test_instructor",
                    "password": "test_instructor",
                    "email": "test_instructor@example.com",
                    "first_name": "Test",
                    "last_name": "Instructor"},
                "status": participation_status.active
            },
            {
                "role_identifier": "ta",
                "user": {
                    "username": "test_ta",
                    "password": "test",
                    "email": "test_ta@example.com",
                    "first_name": "Test",
                    "last_name": "TA"},
                "status": participation_status.active
            },
            {
                "role_identifier": "student",
                "user": {
                    "username": "test_student",
                    "password": "test",
                    "email": "test_student@example.com",
Dong Zhuang's avatar
Dong Zhuang committed
                    "first_name": "Test",
Dong Zhuang's avatar
Dong Zhuang committed
                    "last_name": "Student"},
                "status": participation_status.active
            }
TWO_COURSE_SETUP_LIST = deepcopy(SINGLE_COURSE_SETUP_LIST)
TWO_COURSE_SETUP_LIST[0]["course"]["identifier"] = "test-course1"
TWO_COURSE_SETUP_LIST += deepcopy(SINGLE_COURSE_SETUP_LIST)
TWO_COURSE_SETUP_LIST[1]["course"]["identifier"] = "test-course2"

NONE_PARTICIPATION_USER_CREATE_KWARG_LIST = [
    {
        "username": "test_user1",
        "password": "test_user1",
        "email": "test_user1@suffix.com",
        "first_name": "Test",
        "last_name": "User1",
        "institutional_id": "test_user1_institutional_id",
        "institutional_id_verified": True,
        "status": user_status.active
    },
    {
        "username": "test_user2",
        "password": "test_user2",
        "email": "test_user2@nosuffix.com",
        "first_name": "Test",
        "last_name": "User2",
        "institutional_id": "test_user2_institutional_id",
        "institutional_id_verified": False,
        "status": user_status.active
    },
    {
        "username": "test_user3",
        "password": "test_user3",
        "email": "test_user3@suffix.com",
        "first_name": "Test",
        "last_name": "User3",
        "institutional_id": "test_user3_institutional_id",
        "institutional_id_verified": True,
        "status": user_status.unconfirmed
    },
    {
        "username": "test_user4",
        "password": "test_user4",
        "email": "test_user4@no_suffix.com",
        "first_name": "Test",
        "last_name": "User4",
        "institutional_id": "test_user4_institutional_id",
        "institutional_id_verified": False,
        "status": user_status.unconfirmed
try:
    mc = memcache.Client(['127.0.0.1:11211'])
except Exception:
    pass


SELECT2_HTML_FIELD_ID_SEARCH_PATTERN = re.compile(r'data-field_id="([^"]+)"')


def git_source_url_to_cache_keys(url):
    url_hash = hashlib.md5(url.encode("utf-8")).hexdigest()
    return (
        "test_course:%s" % url_hash,
        "test_sha:%s" % url_hash
    )

Dong Zhuang's avatar
Dong Zhuang committed

class CourseCreateFailure(Exception):
    pass


Dong Zhuang's avatar
Dong Zhuang committed
class ResponseContextMixin(object):
    """
    Response context refers to "the template Context instance that was used
    to render the template that produced the response content".
    Ref: https://docs.djangoproject.com/en/dev/topics/testing/tools/#django.test.Response.context  # noqa
    """
    def get_response_context_value_by_name(self, response, context_name):
        value = response.context.__getitem__(context_name)
        self.assertIsNotNone(
            value,
            msg="%s does not exist in given response" % context_name)
        return value

    def assertResponseContextIsNone(self, resp, context_name):  # noqa
        try:
            value = self.get_response_context_value_by_name(resp, context_name)
        except AssertionError:
            # the context item doesn't exist
            pass
        else:
            self.assertIsNone(value)

    def assertResponseContextIsNotNone(self, resp, context_name, msg=""):  # noqa
Dong Zhuang's avatar
Dong Zhuang committed
        value = self.get_response_context_value_by_name(resp, context_name)
        self.assertIsNotNone(value, msg)
Dong Zhuang's avatar
Dong Zhuang committed

    def assertResponseContextEqual(self, resp, context_name, expected_value):  # noqa
        value = self.get_response_context_value_by_name(resp, context_name)
        self.assertEqual(value, expected_value)

    def assertResponseContextContains(self, resp,  # noqa
                                      context_name, expected_value, html=False):
        value = self.get_response_context_value_by_name(resp, context_name)
        if not html:
            self.assertIn(expected_value, value)
        else:
            self.assertInHTML(expected_value, value)

    def assertResponseContextRegex(  # noqa
            self, resp,  # noqa
            context_name, expected_value_regex):
        value = self.get_response_context_value_by_name(resp, context_name)
        six.assertRegex(self, value, expected_value_regex)

    def get_response_context_answer_feedback(self, response):
        return self.get_response_context_value_by_name(response, "feedback")

    def get_response_context_answer_feedback_string(self, response,
                                             include_bulk_feedback=True):
        answer_feedback = self.get_response_context_value_by_name(
            response, "feedback")

        self.assertTrue(hasattr(answer_feedback, "feedback"))
        if not include_bulk_feedback:
            return answer_feedback.feedback

        if answer_feedback.bulk_feedback is None:
            return answer_feedback.feedback
        else:
            if answer_feedback.feedback is None:
                return answer_feedback.bulk_feedback
            return answer_feedback.feedback + answer_feedback.bulk_feedback

    def assertResponseContextAnswerFeedbackContainsFeedback(  # noqa
Dong Zhuang's avatar
Dong Zhuang committed
            self, response, expected_feedback,
            include_bulk_feedback=True, html=False):
        feedback_str = self.get_response_context_answer_feedback_string(
            response, include_bulk_feedback)
        if not html:
            self.assertIn(expected_feedback, feedback_str)
        else:
            self.assertInHTML(expected_feedback, feedback_str)
    def assertResponseContextAnswerFeedbackNotContainsFeedback(  # noqa
            self, response, expected_feedback,
            include_bulk_feedback=True,
            html=False):
        feedback_str = self.get_response_context_answer_feedback_string(
            response, include_bulk_feedback)

        if not html:
            self.assertNotIn(expected_feedback, feedback_str)
            self.assertInHTML(expected_feedback, feedback_str, count=0)
    def assertResponseContextAnswerFeedbackCorrectnessEquals(  # noqa
                                        self, response, expected_correctness):
        answer_feedback = self.get_response_context_answer_feedback(response)
        if expected_correctness is None:
            try:
                self.assertTrue(hasattr(answer_feedback, "correctness"))
            except AssertionError:
                pass
            else:
                self.assertIsNone(answer_feedback.correctness)
        else:
Dong Zhuang's avatar
Dong Zhuang committed
            if answer_feedback.correctness is None:
                return self.fail("The returned correctness is None, not %s"
                          % expected_correctness)
            self.assertTrue(
                abs(float(answer_feedback.correctness)
                    - float(str(expected_correctness))) < ATOL,
                "%s does not equal %s"
                % (str(answer_feedback.correctness)[:5],
                   str(expected_correctness)[:5]))

    def get_response_body(self, response):
        return self.get_response_context_value_by_name(response, "body")

    def get_page_response_correct_answer(self, response):
        return self.get_response_context_value_by_name(response, "correct_answer")

    def get_page_response_feedback(self, response):
        return self.get_response_context_value_by_name(response, "feedback")

Dong Zhuang's avatar
Dong Zhuang committed
    def debug_print_response_context_value(self, resp, context_name):
        try:
            value = self.get_response_context_value_by_name(resp, context_name)
            print("\n-----------context %s-------------"
                  % context_name)
            if isinstance(value, (list, tuple)):
                from course.validation import ValidationWarning
                for v in value:
                    if isinstance(v, ValidationWarning):
                        print(v.text)
                    else:
                        print(repr(v))
            else:
                print(value)
            print("-----------context end-------------\n")
        except AssertionError:
            print("\n-------no value for context %s----------" % context_name)

    def get_select2_field_id_from_response(self, response,
                                           form_context_name="form"):
        self.assertResponseContextIsNotNone(
            response, form_context_name,
            "The response doesn't contain a context named '%s'"
            % form_context_name)
        form_str = str(response.context[form_context_name])
        m = SELECT2_HTML_FIELD_ID_SEARCH_PATTERN.search(form_str)
        assert m, "pattern not found in %s" % form_str
        return m.group(1)

    def select2_get_request(self, field_id, term=None,
                            select2_urlname='django_select2-json'):

        select2_url = reverse(select2_urlname)
        params = {"field_id": field_id}
        if term is not None:
            assert isinstance(term, six.string_types)
            term = term.strip()
            if term:
                params["term"] = term

        return self.c.get(select2_url, params,
                          HTTP_X_REQUESTED_WITH='XMLHttpRequest')

    def get_select2_response_data(self, response, key="results"):
        import json
        return json.loads(response.content.decode('utf-8'))[key]

Dong Zhuang's avatar
Dong Zhuang committed

class SuperuserCreateMixin(ResponseContextMixin):
Dong Zhuang's avatar
Dong Zhuang committed
    create_superuser_kwargs = CREATE_SUPERUSER_KWARGS

    @classmethod
    def setUpTestData(cls):  # noqa
        # Create superuser, without this, we cannot
        # create user, course and participation.
        cls.superuser = cls.create_superuser()
        cls.c = Client()
        cls.settings_git_root_override = (
            override_settings(GIT_ROOT=tempfile.mkdtemp()))
        cls.settings_git_root_override.enable()
Dong Zhuang's avatar
Dong Zhuang committed
        super(SuperuserCreateMixin, cls).setUpTestData()

    @classmethod
    def create_superuser(cls):
        return get_user_model().objects.create_superuser(
                                                **cls.create_superuser_kwargs)

    @classmethod
    def get_sign_up_view_url(cls):
    @classmethod
    def get_sign_up(cls, follow=True):
        return cls.c.get(cls.get_sign_up_view_url(), follow=follow)
    @classmethod
    def post_sign_up(cls, data, follow=True):
        return cls.c.post(cls.get_sign_up_view_url(), data, follow=follow)
    @classmethod
    def get_profile_view_url(cls):
        return reverse("relate-user_profile")

    @classmethod
    def get_profile(cls, follow=True):
        return cls.c.get(cls.get_profile_view_url(), follow=follow)
    @classmethod
    def post_profile(cls, data, follow=True):
Dong Zhuang's avatar
Dong Zhuang committed
        data.update({"submit_user": [""]})
        return cls.c.post(cls.get_profile_view_url(), data, follow=follow)
    @classmethod
    def post_signout(cls, data, follow=True):
        return cls.c.post(cls.get_sign_up_view_url(), data, follow=follow)
    @classmethod
    def get_impersonate_view_url(cls):
        return reverse("relate-impersonate")

    @classmethod
    def get_stop_impersonate_view_url(cls):
        return reverse("relate-stop_impersonating")

    @classmethod
    def get_impersonate(cls):
        return cls.c.get(cls.get_impersonate_view_url())
    @classmethod
    def post_impersonate(cls, impersonatee, follow=True):
        data = {"add_impersonation_header": ["on"],
                "submit": [''],
                }
        data["user"] = [str(impersonatee.pk)]
        return cls.c.post(cls.get_impersonate_view_url(), data, follow=follow)
    @classmethod
    def get_stop_impersonate(cls, follow=True):
        return cls.c.get(cls.get_stop_impersonate_view_url(), follow=follow)
    @classmethod
    def post_stop_impersonate(cls, follow=True):
        return cls.c.post(
            cls.get_stop_impersonate_view_url(), data, follow=follow)
    @classmethod
    def get_confirm_stop_impersonate_view_url(cls):
Dong Zhuang's avatar
Dong Zhuang committed
        return reverse("relate-confirm_stop_impersonating")

    @classmethod
    def get_confirm_stop_impersonate(cls, follow=True):
        return cls.c.get(
            cls.get_confirm_stop_impersonate_view_url(), follow=follow)
    @classmethod
    def post_confirm_stop_impersonate(cls, follow=True):
        return cls.c.post(
            cls.get_confirm_stop_impersonate_view_url(), {}, follow=follow)
Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def get_reset_password_url(cls, use_instid=False):
        kwargs = {}
        if use_instid:
            kwargs["field"] = "instid"
        return reverse("relate-reset_password", kwargs=kwargs)

Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def get_reset_password(cls, use_instid=False):
        return cls.c.get(cls.get_reset_password_url(use_instid))
Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def post_reset_password(cls, data, use_instid=False):
        return cls.c.post(cls.get_reset_password_url(use_instid),
                          data=data)

    def get_reset_password_stage2_url(self, user_id, sign_in_key, **kwargs):
        url = reverse("relate-reset_password_stage2", args=(user_id, sign_in_key))
        querystring = kwargs.pop("querystring", None)
        if querystring is not None:
            assert isinstance(querystring, dict)
            url += ("?%s"
                    % "&".join(
                        ["%s=%s" % (k, v)
                         for (k, v) in six.iteritems(querystring)]))
        return url

    def get_reset_password_stage2(self, user_id, sign_in_key, **kwargs):
        return self.c.get(self.get_reset_password_stage2_url(
            user_id=user_id, sign_in_key=sign_in_key, **kwargs))

    def post_reset_password_stage2(self, user_id, sign_in_key, data, **kwargs):
        return self.c.post(self.get_reset_password_stage2_url(
            user_id=user_id, sign_in_key=sign_in_key, **kwargs), data=data)
    @classmethod
    def get_fake_time_url(cls):
Dong Zhuang's avatar
Dong Zhuang committed
        return reverse("relate-set_fake_time")

    @classmethod
    def get_set_fake_time(cls):
        return cls.c.get(cls.get_fake_time_url())
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def post_set_fake_time(cls, data, follow=True):
        return cls.c.post(cls.get_fake_time_url(), data, follow=follow)
Dong Zhuang's avatar
Dong Zhuang committed

    def assertSessionFakeTimeEqual(self, session, expected_date_time):  # noqa
        fake_time_timestamp = session.get("relate_fake_time", None)
        if fake_time_timestamp is None:
            faked_time = None
            if expected_date_time is not None:
                raise AssertionError(
                    "the session doesn't have 'relate_fake_time' attribute")
        else:
            faked_time = datetime.datetime.fromtimestamp(fake_time_timestamp)
        self.assertEqual(faked_time, expected_date_time)

    def assertSessionFakeTimeIsNone(self, session):  # noqa
        self.assertSessionFakeTimeEqual(session, None)

    @classmethod
    def get_set_pretend_facilities_url(cls):
Dong Zhuang's avatar
Dong Zhuang committed
        return reverse("relate-set_pretend_facilities")

    @classmethod
    def get_set_pretend_facilities(cls):
        return cls.c.get(cls.get_set_pretend_facilities_url())
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def post_set_pretend_facilities(cls, data, follow=True):
        return cls.c.post(cls.get_set_pretend_facilities_url(), data,
                          follow=follow)
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def force_remove_all_course_dir(cls):
        # This is only necessary for courses which are created test wise,
        # not class wise.
        from relate.utils import force_remove_path
        from course.content import get_course_repo_path
        for c in Course.objects.all():
            force_remove_path(get_course_repo_path(c))

Dong Zhuang's avatar
Dong Zhuang committed
    def assertSessionPretendFacilitiesContains(self, session, expected_facilities):  # noqa
        pretended = session.get("relate_pretend_facilities", None)
        if expected_facilities is None:
            return self.assertIsNone(pretended)
        if pretended is None:
            raise AssertionError(
                "the session doesn't have "
                "'relate_pretend_facilities' attribute")

        if isinstance(expected_facilities, (list, tuple)):
            self.assertTrue(set(expected_facilities).issubset(set(pretended)))
        else:
            self.assertTrue(expected_facilities in pretended)

    def assertSessionPretendFacilitiesIsNone(self, session):  # noqa
        pretended = session.get("relate_pretend_facilities", None)
        self.assertIsNone(pretended)

    def assertFormErrorLoose(self, response, errors, form_name="form"):  # noqa
        """Assert that errors is found in response.context['form'] errors"""
        import itertools
        if errors is None:
            errors = []
        if not isinstance(errors, (list, tuple)):
            errors = [errors]
Dong Zhuang's avatar
Dong Zhuang committed
        try:
            form_errors = list(
                itertools.chain(*response.context[form_name].errors.values()))
        except TypeError:
            form_errors = None

        if form_errors is None or not form_errors:
            if errors:
Dong Zhuang's avatar
Dong Zhuang committed
                self.fail("%s has no error" % form_name)
Dong Zhuang's avatar
Dong Zhuang committed

        if form_errors:
            if not errors:
                self.fail("%s unexpectedly has following errors: %s"
                          % (form_name, repr(form_errors)))

        for err in errors:
            self.assertIn(err, form_errors)
Dong Zhuang's avatar
Dong Zhuang committed

# {{{ defined here so that they can be used by in classmethod and instance method

def get_flow_page_ordinal_from_page_id(flow_session_id, page_id):
    flow_page_data = FlowPageData.objects.get(
        flow_session__id=flow_session_id,
        page_id=page_id
    )
    return flow_page_data.page_ordinal
def get_flow_page_id_from_page_ordinal(flow_session_id, page_ordinal,
                                       with_group_id=False):
    flow_page_data = FlowPageData.objects.get(
        flow_session__id=flow_session_id,
        page_ordinal=page_ordinal
    if with_group_id:
        return flow_page_data.page_id, flow_page_data.group_id
Dong Zhuang's avatar
Dong Zhuang committed
class CoursesTestMixinBase(SuperuserCreateMixin):

    # A list of Dicts, each of which contain a course dict and a list of
    # participations. See SINGLE_COURSE_SETUP_LIST for the setup for one course.
    courses_setup_list = []
    none_participation_user_create_kwarg_list = []
    courses_attributes_extra_list = None
    override_settings_at_post_create_course = {}
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def setUpTestData(cls):  # noqa
        super(CoursesTestMixinBase, cls).setUpTestData()
        cls.default_flow_params = None
Dong Zhuang's avatar
Dong Zhuang committed
        cls.n_courses = 0
        if cls.courses_attributes_extra_list is not None:
            if (len(cls.courses_attributes_extra_list)
                    != len(cls.courses_setup_list)):
                raise ValueError(
                    "'courses_attributes_extra_list' must has equal length "
                    "with courses")

        for i, course_setup in enumerate(cls.courses_setup_list):
Dong Zhuang's avatar
Dong Zhuang committed
            if "course" not in course_setup:
                continue

            cls.n_courses += 1
            course_identifier = course_setup["course"]["identifier"]
            course_setup_kwargs = course_setup["course"]
            if cls.courses_attributes_extra_list:
                extra_attrs = cls.courses_attributes_extra_list[i]
                assert isinstance(extra_attrs, dict)
                course_setup_kwargs.update(extra_attrs)
                cls.create_course(course_setup_kwargs)
            except Exception:
                raise

Dong Zhuang's avatar
Dong Zhuang committed
            course = Course.objects.get(identifier=course_identifier)
            if "participations" in course_setup:
                for participation in course_setup["participations"]:
                    create_user_kwargs = participation.get("user")
                    if not create_user_kwargs:
                        continue
                    role_identifier = participation.get("role_identifier")
                    if not role_identifier:
                        continue
                    cls.create_participation(
                        course=course,
                        user_or_create_user_kwargs=create_user_kwargs,
Dong Zhuang's avatar
Dong Zhuang committed
                        role_identifier=role_identifier,
                        status=participation.get("status",
                                                 participation_status.active)
                    )

            # Remove superuser from participation for further test
            # such as impersonate in auth module
            try:
                superuser_participations = (
                    Participation.objects.filter(user=cls.superuser))
                for sp in superuser_participations:
                    Participation.delete(sp)
            except Participation.DoesNotExist:
                pass

            cls.non_participation_users = get_user_model().objects.none()
            if cls.none_participation_user_create_kwarg_list:
                pks = []
                for create_user_kwargs in (
                        cls.none_participation_user_create_kwarg_list):
                    user = cls.create_user(create_user_kwargs)
                    pks.append(user.pk)
                cls.non_participation_users = (
                    get_user_model().objects.filter(pk__in=pks))

Dong Zhuang's avatar
Dong Zhuang committed
        cls.course_qset = Course.objects.all()

    @classmethod
    def create_user(cls, create_user_kwargs):
        user, created = get_user_model().objects.get_or_create(
            email__iexact=create_user_kwargs["email"], defaults=create_user_kwargs)
Dong Zhuang's avatar
Dong Zhuang committed
        if created:
            try:
                # TODO: why pop failed here?
                password = create_user_kwargs["password"]
Andreas Klöckner's avatar
Andreas Klöckner committed
            except Exception:
Dong Zhuang's avatar
Dong Zhuang committed
            user.set_password(password)
            user.save()
        return user

    @classmethod
    def create_participation(
            cls, course, user_or_create_user_kwargs,
            role_identifier=None, status=None):
        if isinstance(user_or_create_user_kwargs, get_user_model()):
            user = user_or_create_user_kwargs
        else:
            assert isinstance(user_or_create_user_kwargs, dict)
            user = cls.create_user(user_or_create_user_kwargs)
        if status is None:
            status = participation_status.active
Dong Zhuang's avatar
Dong Zhuang committed
        participation, p_created = Participation.objects.get_or_create(
            user=user,
            course=course,
            status=status
        )
        if role_identifier is None:
            role_identifier = "student"
Dong Zhuang's avatar
Dong Zhuang committed
        if p_created:
            role = ParticipationRole.objects.filter(
                course=course, identifier=role_identifier)
            participation.roles.set(role)
        return participation

    @classmethod
    def post_create_course(cls, create_course_kwargs, raise_error=True,
                           login_superuser=True):
        # To speed up, use create_course instead, this is better used for tests
        if login_superuser:
            cls.c.force_login(cls.superuser)
        existing_course_count = Course.objects.count()
        with override_settings(**cls.override_settings_at_post_create_course):
            resp = cls.c.post(cls.get_set_up_new_course_url(),
                              data=create_course_kwargs)
        if raise_error:
            all_courses = Course.objects.all()
            if not all_courses.count() == existing_course_count + 1:
                error_string = None
                # most probably the reason course creation form error
                form_context = resp.context.__getitem__("form")
                assert form_context
                error_list = []
                if form_context.errors:
                    error_list = [
                        "%s: %s"
                        % (field,
                           "\n".join(["%s:%s" % (type(e).__name__, str(e))
                                      for e in errs]))
                        for field, errs
                        in six.iteritems(form_context.errors.as_data())]
                non_field_errors = form_context.non_field_errors()
                if non_field_errors:
                    error_list.append(repr(non_field_errors))
                if error_list:
                    error_string = "\n".join(error_list)
                if not error_string:
                    error_string = ("course creation failed for unknown errors")
                raise CourseCreateFailure(error_string)
            # the course is created successfully
            last_course = all_courses.last()
            assert last_course
            url_cache_key, commit_sha_cach_key = (
                git_source_url_to_cache_keys(last_course.git_source))
            mc.set_multi({url_cache_key: get_course_repo_path(last_course),
                          commit_sha_cach_key: last_course.active_git_commit_sha},
Dong Zhuang's avatar
Dong Zhuang committed
                         time=120000
Dong Zhuang's avatar
Dong Zhuang committed

    def create_course(cls, create_course_kwargs, raise_error=True):
        has_cached_repo = False
        repo_cache_key, commit_sha_cach_key = (
            git_source_url_to_cache_keys(create_course_kwargs["git_source"]))
        try:
            exist_course_repo_path = mc.get(repo_cache_key)
            exist_commit_sha = mc.get(commit_sha_cach_key)
            if os.path.isdir(exist_course_repo_path):
                has_cached_repo = bool(exist_course_repo_path and exist_commit_sha)
            else:
                has_cached_repo = False
        except Exception:
            pass

        if not has_cached_repo:
            # fall back to post create
            return cls.post_create_course(
                create_course_kwargs, raise_error=raise_error)
        existing_course_count = Course.objects.count()
        new_course_repo_path = os.path.join(settings.GIT_ROOT,
                                        create_course_kwargs["identifier"])
        shutil.copytree(exist_course_repo_path, new_course_repo_path)
        create_kwargs = deepcopy(create_course_kwargs)
        create_kwargs["active_git_commit_sha"] = exist_commit_sha
        Course.objects.create(**create_kwargs)
        assert Course.objects.count() == existing_course_count + 1

    @classmethod
    def get_course_view_url(cls, view_name, course_identifier=None):
        course_identifier = (
            course_identifier or cls.get_default_course_identifier())
        return reverse(view_name, args=[course_identifier])

    @classmethod
    def get_set_up_new_course_url(cls):
        return reverse("relate-set_up_new_course")

    @classmethod
    def get_set_up_new_course(cls):
        return cls.c.get(cls.get_update_course_url)

    @classmethod
    def get_edit_course_url(cls, course_identifier=None):
        return cls.get_course_view_url("relate-edit_course", course_identifier)

    @classmethod
    def post_edit_course(cls, data, course=None):
        course = course or cls.get_default_course()
        edit_course_url = cls.get_edit_course_url(course.identifier)
        return cls.c.post(edit_course_url, data)

    @classmethod
    def get_edit_course(cls, course=None):
        course = course or cls.get_default_course()
        return cls.c.get(cls.get_edit_course_url(course.identifier))
    def get_course_page_url(cls, course_identifier=None):
        return cls.get_course_view_url("relate-course_page", course_identifier)
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def get_logged_in_user(cls):
Dong Zhuang's avatar
Dong Zhuang committed
        try:
            logged_in_user_id = cls.c.session['_auth_user_id']
Dong Zhuang's avatar
Dong Zhuang committed
            from django.contrib.auth import get_user_model
            logged_in_user = get_user_model().objects.get(
                pk=int(logged_in_user_id))
        except KeyError:
            logged_in_user = None
        return logged_in_user

    @classmethod
    def temporarily_switch_to_user(cls, switch_to):
Dong Zhuang's avatar
Dong Zhuang committed

        from functools import wraps

        class ClientUserSwitcher(object):
            def __init__(self, switch_to):
                self.client = cls.c
Dong Zhuang's avatar
Dong Zhuang committed
                self.switch_to = switch_to
                self.logged_in_user = cls.get_logged_in_user()
Dong Zhuang's avatar
Dong Zhuang committed

            def __enter__(self):
                if self.logged_in_user == self.switch_to:
                    return
                if self.switch_to is None:
                    self.client.logout()
                    return
                self.client.force_login(self.switch_to)

            def __exit__(self, exc_type, exc_val, exc_tb):
                if self.logged_in_user == self.switch_to:
                    return
                if self.logged_in_user is None:
                    self.client.logout()
                    return
                self.client.force_login(self.logged_in_user)

            def __call__(self, func):
                @wraps(func)
                def wrapper(*args, **kw):
                    with self:
                        return func(*args, **kw)
                return wrapper

        return ClientUserSwitcher(switch_to)

    @classmethod
    def get_default_course(cls):
        if Course.objects.count() > 1:
            raise AttributeError(
                "'course' arg can not be omitted for "
                "testcases with more than one courses")
        raise NotImplementedError

Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def get_default_course_identifier(cls):
        if Course.objects.count() > 1:
            raise AttributeError(
                "'course_identifier' arg can not be omitted for "
                "testcases with more than one courses")
        raise NotImplementedError
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def get_latest_session_id(cls, course_identifier):
        flow_session_qset = FlowSession.objects.filter(
            course__identifier=course_identifier).order_by('-pk')[:1]
        if flow_session_qset:
            return flow_session_qset[0].id
        else:
            return None
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def get_default_flow_session_id(cls, course_identifier):

    @classmethod
    def update_default_flow_session_id(cls, course_identifier):
        raise NotImplementedError
    @classmethod
    def get_default_instructor_user(cls, course_identifier):
        return Participation.objects.filter(
            course__identifier=course_identifier,
            roles__identifier="instructor",
            status=participation_status.active
        ).first().user
    @classmethod
    def update_course_attribute(cls, attrs, course=None):
        # course instead of course_identifier because we need to do
        # refresh_from_db
        assert isinstance(attrs, dict)
        course = course or cls.get_default_course()
        if attrs:
            course.__dict__.update(attrs)
            course.save()
            course.refresh_from_db()

Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def start_flow(cls, flow_id, course_identifier=None):
        """
        Notice: this is a classmethod, so this will change the data
        created in setUpTestData, so don't do this in individual tests, or
        testdata will be different between tests.
        """
        course_identifier = course_identifier or cls.get_default_course_identifier()
        existing_session_count = FlowSession.objects.all().count()
        params = {"course_identifier": course_identifier,
Dong Zhuang's avatar
Dong Zhuang committed
                  "flow_id": flow_id}
        resp = cls.c.post(reverse("relate-view_start_flow", kwargs=params))
Dong Zhuang's avatar
Dong Zhuang committed
        assert resp.status_code == 302
        new_session_count = FlowSession.objects.all().count()
        assert new_session_count == existing_session_count + 1
        _, _, params = resolve(resp.url)
        del params["page_ordinal"]
        cls.default_flow_params = params
        cls.update_default_flow_session_id(course_identifier)
Dong Zhuang's avatar
Dong Zhuang committed
        return resp

    def end_flow(cls, course_identifier=None, flow_session_id=None):
        """
        Be cautious that this is a classmethod
        """
        if cls.default_flow_params is None:
            raise RuntimeError("There's no started flow_sessions.")
        params = deepcopy(cls.default_flow_params)
        if course_identifier:
            params["course_identifier"] = course_identifier
        if flow_session_id:
            params["flow_session_id"] = flow_session_id
        resp = cls.c.post(reverse("relate-finish_flow_session_view",
                                  kwargs=params), {'submit': ['']})
        return resp
    @classmethod
    def get_flow_params(cls, course_identifier=None, flow_session_id=None):
                course_identifier or cls.get_default_course_identifier())
        if flow_session_id is None:
            flow_session_id = cls.get_default_flow_session_id(course_identifier)
        return {
            "course_identifier": course_identifier,
            "flow_session_id": flow_session_id
        }

    @classmethod
    def get_page_params(cls, course_identifier=None, flow_session_id=None,
                        page_ordinal=None):
        page_params = cls.get_flow_params(course_identifier, flow_session_id)
        if page_ordinal is None:
            page_ordinal = 0
        page_params.update({"page_ordinal": page_ordinal})
Dong Zhuang's avatar
Dong Zhuang committed

    def get_page_ordinal_via_page_id(
            cls, page_id, course_identifier=None, flow_session_id=None):
        flow_params = cls.get_flow_params(course_identifier, flow_session_id)
        return (
            get_flow_page_ordinal_from_page_id(
                flow_params["flow_session_id"], page_id))

    def get_page_id_via_page_oridnal(
            cls, page_ordinal, course_identifier=None, flow_session_id=None,
            with_group_id=False):
        flow_params = cls.get_flow_params(course_identifier, flow_session_id)
        return (
            get_flow_page_id_from_page_ordinal(
                flow_params["flow_session_id"], page_ordinal,
                with_group_id=with_group_id))

    def get_page_view_url_by_ordinal(
            cls, viewname, page_ordinal, course_identifier=None,
            flow_session_id=None):
        page_params = cls.get_page_params(
            course_identifier, flow_session_id, page_ordinal)
        return reverse(viewname, kwargs=page_params)

    def get_page_view_url_by_page_id(
            cls, viewname, page_id, course_identifier=None, flow_session_id=None):
        page_ordinal = cls.get_page_ordinal_via_page_id(
            page_id, course_identifier, flow_session_id)
        return cls.get_page_view_url_by_ordinal(
            viewname, page_ordinal, course_identifier, flow_session_id)
    def get_page_url_by_ordinal(
            cls, page_ordinal, course_identifier=None, flow_session_id=None):
        return cls.get_page_view_url_by_ordinal(