Skip to content
base_test_mixins.py 89.9 KiB
Newer Older
Dong Zhuang's avatar
Dong Zhuang committed

        for page_tuple in TEST_PAGE_TUPLE:
            if page_id == page_tuple.page_id:
                group_id = page_tuple.group_id
                if ensure_grading_ui_get_before_grading:
                    cls.ensure_grading_ui_get(page_id)

                if ensure_analytic_page_get_before_grading:
                    cls.ensure_analytic_page_get(group_id, page_id)

                if ensure_download_before_grading:
                    cls.ensure_download_submission(group_id, page_id)

                if not page_tuple.need_human_grade:
                    break

                assign_full_grades = True

                if grade_data is not None:
                    assert isinstance(grade_data, dict)
                    assign_full_grades = False
                else:
                    grade_data = page_tuple.grade_data.copy()

                if assign_full_grades:
                    expected_grades = page_tuple.full_points

                if grade_data_extra_kwargs:
                    assert isinstance(grade_data_extra_kwargs, dict)
                    grade_data.update(grade_data_extra_kwargs)

                post_grade_response = cls.post_grade_by_page_id(
                    page_id, grade_data,
                    force_login_instructor=force_login_instructor)

                assert (post_grade_response.status_code
                        == expected_post_grading_status_code)

                if post_grade_response.status_code == 200:
Dong Zhuang's avatar
Dong Zhuang committed
                    if do_session_score_equal_assersion:
                        cls.assertSessionScoreEqual(expected_grades)
Dong Zhuang's avatar
Dong Zhuang committed

                if ensure_download_after_grading:
                    cls.ensure_download_submission(group_id, page_id)

                if ensure_analytic_page_get_after_grading:
                    cls.ensure_analytic_page_get(group_id, page_id)

                if ensure_grading_ui_get_after_grading:
                    cls.ensure_grading_ui_get(page_id)

        return post_grade_response


class FallBackStorageMessageTestMixin(object):
    # In case other message storage are used, the following is the default
    # storage used by django and RELATE. Tests which concerns the message
    # should not include this mixin.
    storage = 'django.contrib.messages.storage.fallback.FallbackStorage'

    def setUp(self):  # noqa
Dong Zhuang's avatar
Dong Zhuang committed
        super(FallBackStorageMessageTestMixin, self).setUp()
        self.msg_settings_override = override_settings(MESSAGE_STORAGE=self.storage)
        self.msg_settings_override.enable()
        self.addCleanup(self.msg_settings_override.disable)

    def get_listed_storage_from_response(self, response):
Dong Zhuang's avatar
Dong Zhuang committed
        return list(self.get_response_context_value_by_name(response, 'messages'))

    def clear_message_response_storage(self, response):
        # this should only be used for debug, because we are using private method
        # which might change
Dong Zhuang's avatar
Dong Zhuang committed
        try:
            storage = self.get_response_context_value_by_name(response, 'messages')
        except AssertionError:
            # message doesn't exist in response context
            return
        if hasattr(storage, '_loaded_data'):
            storage._loaded_data = []
        elif hasattr(storage, '_loaded_message'):
            storage._loaded_messages = []

        if hasattr(storage, '_queued_messages'):
            storage._queued_messages = []

        self.assertEqual(len(storage), 0)

    def assertResponseMessagesCount(self, response, expected_count):  # noqa
        storage = self.get_listed_storage_from_response(response)
        self.assertEqual(len(storage), expected_count)

    def assertResponseMessagesEqual(self, response, expected_messages):  # noqa
        storage = self.get_listed_storage_from_response(response)
Dong Zhuang's avatar
Dong Zhuang committed
        if not isinstance(expected_messages, list):
            expected_messages = [expected_messages]
        self.assertEqual(len([m for m in storage]), len(expected_messages))
        self.assertEqual([m.message for m in storage], expected_messages)

Dong Zhuang's avatar
Dong Zhuang committed
    def assertResponseMessagesEqualRegex(self, response, expected_message_regexs):  # noqa
        storage = self.get_listed_storage_from_response(response)
        if not isinstance(expected_message_regexs, list):
            expected_message_regexs = [expected_message_regexs]
        self.assertEqual(len([m for m in storage]), len(expected_message_regexs))
        messages = [m.message for m in storage]
        for idx, m in enumerate(messages):
            six.assertRegex(self, m, expected_message_regexs[idx])

    def assertResponseMessagesContains(self, response, expected_messages,  # noqa
                                       loose=False):
Dong Zhuang's avatar
Dong Zhuang committed
        storage = self.get_listed_storage_from_response(response)
        if isinstance(expected_messages, str):
            expected_messages = [expected_messages]
        messages = [m.message for m in storage]
        if loose:
            from django.utils.encoding import force_text
            messages = " ".join([force_text(m) for m in messages])
Dong Zhuang's avatar
Dong Zhuang committed
        for em in expected_messages:
            self.assertIn(em, messages)

    def assertResponseMessageLevelsEqual(self, response, expected_levels):  # noqa
        storage = self.get_listed_storage_from_response(response)
        self.assertEqual([m.level for m in storage], expected_levels)

    def debug_print_response_messages(self, response):
        """
        For debugging :class:`django.contrib.messages` objects in post response
        :param response: response
        """
        try:
            storage = self.get_listed_storage_from_response(response)
            print("\n-----------message start (%i total)-------------"
                  % len(storage))
            for m in storage:
                print(m.message)
            print("-----------message end-------------\n")
        except KeyError:
            print("\n-------no message----------")
Dong Zhuang's avatar
Dong Zhuang committed


class SubprocessRunpyContainerMixin(object):
Dong Zhuang's avatar
Dong Zhuang committed
    """
    This mixin is used to fake a runpy container, only needed when
    the TestCase include test(s) for code questions
    """
    @classmethod
    def setUpClass(cls):  # noqa
        if six.PY2:
            from unittest import SkipTest
            raise SkipTest("In process fake container is configured for "
                           "PY3 only, since currently runpy docker only "
                           "provide PY3 envrionment")

        super(SubprocessRunpyContainerMixin, cls).setUpClass()
Dong Zhuang's avatar
Dong Zhuang committed

        python_executable = os.getenv("PY_EXE")

        if not python_executable:
            python_executable = sys.executable

        import subprocess
        args = [python_executable,
                os.path.abspath(
                    os.path.join(
                        os.path.dirname(__file__), os.pardir,
                        "docker-image-run-py", "runpy")),
                ]
        cls.faked_container_process = subprocess.Popen(
            args,
            stdout=subprocess.DEVNULL,

            # because runpy prints to stderr
            stderr=subprocess.DEVNULL
        )

    def setUp(self):
        super(SubprocessRunpyContainerMixin, self).setUp()
        self.faked_container_patch = mock.patch(
            "course.page.code.SPAWN_CONTAINERS_FOR_RUNPY", False)
        self.faked_container_patch.start()
        self.addCleanup(self.faked_container_patch.stop)
Dong Zhuang's avatar
Dong Zhuang committed

    @classmethod
    def tearDownClass(cls):  # noqa
        super(SubprocessRunpyContainerMixin, cls).tearDownClass()

        from course.page.code import SPAWN_CONTAINERS_FOR_RUNPY
        # Make sure SPAWN_CONTAINERS_FOR_RUNPY is reset to True
        assert SPAWN_CONTAINERS_FOR_RUNPY
        if sys.platform.startswith("win"):
            # Without these lines, tests on Appveyor hanged when all tests
            # finished.
            # However, On nix platforms, these lines resulted in test
            # failure when there were more than one TestCases which were using
            # this mixin. So we don't kill the subprocess, and it won't bring
            # bad side effects to remainder tests.
            cls.faked_container_process.kill()


def improperly_configured_cache_patch():
    # can be used as context manager or decorator
    if six.PY3:
        built_in_import_path = "builtins.__import__"
        import builtins  # noqa
    else:
        built_in_import_path = "__builtin__.__import__"
        import __builtin__ as builtins  # noqa
    built_in_import = builtins.__import__

    def my_disable_cache_import(name, globals=None, locals=None, fromlist=(),
                                level=0):
        if name == "django.core.cache":
            raise ImproperlyConfigured()
        return built_in_import(name, globals, locals, fromlist, level)

    return mock.patch(built_in_import_path, side_effect=my_disable_cache_import)

# {{{ admin

ADMIN_TWO_COURSE_SETUP_LIST = deepcopy(TWO_COURSE_SETUP_LIST)
# switch roles
ADMIN_TWO_COURSE_SETUP_LIST[1]["participations"][0]["role_identifier"] = "ta"
ADMIN_TWO_COURSE_SETUP_LIST[1]["participations"][1]["role_identifier"] = "instructor"  # noqa


class AdminTestMixin(TwoCourseTestMixin):
    courses_setup_list = ADMIN_TWO_COURSE_SETUP_LIST
    none_participation_user_create_kwarg_list = (
        NONE_PARTICIPATION_USER_CREATE_KWARG_LIST)

    @classmethod
    def setUpTestData(cls):  # noqa
        super(AdminTestMixin, cls).setUpTestData()  # noqa

        # create 2 participation (with new user) for course1
        from tests.factories import ParticipationFactory

        cls.course1_student_participation2 = (
            ParticipationFactory.create(course=cls.course1))
        cls.course1_student_participation3 = (
            ParticipationFactory.create(course=cls.course1))
        cls.instructor1 = cls.course1_instructor_participation.user
        cls.instructor2 = cls.course2_instructor_participation.user
        assert cls.instructor1 != cls.instructor2

        # grant all admin permissions to instructors
        from django.contrib.auth.models import Permission

        for user in [cls.instructor1, cls.instructor2]:
            user.is_staff = True
            user.save()
            for perm in Permission.objects.all():
                user.user_permissions.add(perm)

    @classmethod
    def get_admin_change_list_view_url(cls, app_name, model_name):
        return reverse("admin:%s_%s_changelist" % (app_name, model_name))

    @classmethod
    def get_admin_change_view_url(cls, app_name, model_name, args=None):
        if args is None:
            args = []
        return reverse("admin:%s_%s_change" % (app_name, model_name), args=args)

Dong Zhuang's avatar
Dong Zhuang committed
    @classmethod
    def get_admin_add_view_url(cls, app_name, model_name, args=None):
        if args is None:
            args = []
        return reverse("admin:%s_%s_add" % (app_name, model_name), args=args)

    def get_admin_form_fields(self, response):
        """
        Return a list of AdminFields for the AdminForm in the response.
        """
        admin_form = response.context['adminform']
        fieldsets = list(admin_form)

        field_lines = []
        for fieldset in fieldsets:
            field_lines += list(fieldset)

        fields = []
        for field_line in field_lines:
            fields += list(field_line)

        return fields

    def get_admin_form_fields_names(self, response):
        return [f.field.name for f in self.get_admin_form_fields(response)]

    def get_changelist(self, request, model, modeladmin):
        from django.contrib.admin.views.main import ChangeList
        return ChangeList(
            request, model, modeladmin.list_display,
            modeladmin.list_display_links, modeladmin.get_list_filter(request),
            modeladmin.date_hierarchy, modeladmin.search_fields,
            modeladmin.list_select_related, modeladmin.list_per_page,
            modeladmin.list_max_show_all, modeladmin.list_editable, modeladmin,
        )

    def get_filterspec_list(self, request, changelist=None, model=None,
                            modeladmin=None):
        if changelist is None:
            assert request and model and modeladmin
            changelist = self.get_changelist(request, model, modeladmin)

        filterspecs = changelist.get_filters(request)[0]
        filterspec_list = []
        for filterspec in filterspecs:
            choices = tuple(c['display'] for c in filterspec.choices(changelist))
            filterspec_list.append(choices)

        return filterspec_list

# }}}