Skip to content
test_views.py 99.5 KiB
Newer Older
        with self.temporarily_switch_to_user(u):
            return self.client.get(
                self.get_test_flow_url(course_identifier))

    def post_test_flow_view(
            self, data, course_identifier=None, force_login_instructor=True):
        course_identifier or self.get_default_course_identifier()

        if not force_login_instructor:
            u = self.get_logged_in_user()
        else:
            u = self.instructor_participation.user
        with self.temporarily_switch_to_user(u):
            return self.client.post(
                self.get_test_flow_url(course_identifier),
                data=data)

    def get_default_post_data(self, action="test", **kwargs):
        data = {
            "flow_id": self.flow_id,
Andreas Klöckner's avatar
Andreas Klöckner committed
            action: ""
        }
        data.update(kwargs)
        return data

    def test_anonymous(self):
        with self.temporarily_switch_to_user(None):
            resp = self.get_test_flow_view(
                force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

            resp = self.post_test_flow_view(
                data=self.get_default_post_data(), force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

    def test_student(self):
        with self.temporarily_switch_to_user(self.student_participation.user):
            resp = self.get_test_flow_view(
                force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

            resp = self.post_test_flow_view(
                data=self.get_default_post_data(), force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

    def test_success(self):
        resp = self.get_test_flow_view()
        self.assertEqual(resp.status_code, 200)

        resp = self.post_test_flow_view(data=self.get_default_post_data())
        self.assertFormErrorLoose(resp, None)
        self.assertRedirects(resp, self.get_view_start_flow_url(self.flow_id),
                             fetch_redirect_response=False)

    def test_form_invalid(self):
        with mock.patch(
                "course.views.FlowTestForm.is_valid") as mock_is_valid:
            mock_is_valid.return_value = False

            with self.temporarily_switch_to_user(self.instructor_participation.user):
                resp = self.post_test_flow_view(
                    data=self.get_default_post_data())
                self.assertEqual(resp.status_code, 200)

    def test_invalid_operation(self):
        resp = self.post_test_flow_view(
            data=self.get_default_post_data(action="unknown"))
        self.assertEqual(resp.status_code, 400)


class GrantExceptionTestMixin(MockAddMessageMixing, SingleCoursePageTestMixin):

    @classmethod
    def setUpTestData(cls):  # noqa
        super().setUpTestData()
        cls.fs = factories.FlowSessionFactory(
            course=cls.course, participation=cls.student_participation,
            flow_id=cls.flow_id, in_progress=False)

    def setUp(self):
        self.fs.refresh_from_db()

    def get_grant_exception_url(self, course_identifier=None):
        return reverse(
            "relate-grant_exception",
            kwargs={"course_identifier":
                        course_identifier or self.get_default_course_identifier()})

    def get_grant_exception_view(
            self, course_identifier=None, force_login_instructor=True):

        if not force_login_instructor:
            u = self.get_logged_in_user()
        else:
            u = self.instructor_participation.user
        with self.temporarily_switch_to_user(u):
            return self.client.get(
                self.get_grant_exception_url(course_identifier))

    def post_grant_exception_view(
            self, data, course_identifier=None, force_login_instructor=True):

        if not force_login_instructor:
            u = self.get_logged_in_user()
        else:
            u = self.instructor_participation.user
        with self.temporarily_switch_to_user(u):
            return self.client.post(
                self.get_grant_exception_url(course_identifier),
                data=data)

    def get_grant_exception_stage_2_url(
            self, participation_id=None, flow_id=None, course_identifier=None):
        participation_id = participation_id or self.student_participation.id
        flow_id = flow_id or self.flow_id
        return reverse(
            "relate-grant_exception_stage_2",
            kwargs={"course_identifier":
                        course_identifier or self.get_default_course_identifier(),
                    "participation_id": participation_id,
                    "flow_id": flow_id})

    def get_grant_exception_stage_2_view(
            self, participation_id=None, flow_id=None, course_identifier=None,
            force_login_instructor=True):
        participation_id = participation_id or self.student_participation.id
        flow_id = flow_id or self.flow_id

        if not force_login_instructor:
            u = self.get_logged_in_user()
        else:
            u = self.instructor_participation.user
        with self.temporarily_switch_to_user(u):
            return self.client.get(
                self.get_grant_exception_stage_2_url(
                    participation_id, flow_id, course_identifier))

    def post_grant_exception_stage_2_view(
            self, data, participation_id=None, flow_id=None,
            course_identifier=None,
            force_login_instructor=True):

        participation_id = participation_id or self.student_participation.id
        flow_id = flow_id or self.flow_id
        if not force_login_instructor:
            u = self.get_logged_in_user()
        else:
            u = self.instructor_participation.user
        with self.temporarily_switch_to_user(u):
            return self.client.post(
                self.get_grant_exception_stage_2_url(
                    participation_id, flow_id, course_identifier),
                data=data)

    def get_grant_exception_stage_3_url(
            self, session_id=None, participation_id=None, flow_id=None,
            course_identifier=None):

        session_id = session_id or self.fs.pk
        participation_id = participation_id or self.student_participation.id
        flow_id = flow_id or self.flow_id

        return reverse(
            "relate-grant_exception_stage_3",
            kwargs={"course_identifier":
                        course_identifier or self.get_default_course_identifier(),
                    "participation_id": participation_id,
                    "flow_id": flow_id,
                    "session_id": session_id})

    def get_grant_exception_stage_3_view(
            self, session_id=None, participation_id=None, flow_id=None,
            course_identifier=None,
            force_login_instructor=True):
        session_id = session_id or self.fs.pk
        participation_id = participation_id or self.student_participation.id
        flow_id = flow_id or self.flow_id

        if not force_login_instructor:
            u = self.get_logged_in_user()
        else:
            u = self.instructor_participation.user
        with self.temporarily_switch_to_user(u):
            return self.client.get(
                self.get_grant_exception_stage_3_url(
                    session_id, participation_id, flow_id, course_identifier))

    def post_grant_exception_stage_3_view(
            self, data, session_id=None, participation_id=None, flow_id=None,
            course_identifier=None,
            force_login_instructor=True):
        session_id = session_id or self.fs.pk
        participation_id = participation_id or self.student_participation.id
        flow_id = flow_id or self.flow_id

        if not force_login_instructor:
            u = self.get_logged_in_user()
        else:
            u = self.instructor_participation.user
        with self.temporarily_switch_to_user(u):
            return self.client.post(
                self.get_grant_exception_stage_3_url(
                    session_id, participation_id, flow_id, course_identifier),
                data=data)


class GrantExceptionStage1Test(GrantExceptionTestMixin, TestCase):
    # test views.grant_exception

    def get_default_post_data(self, action="next", **kwargs):
        data = {
            "participation": self.student_participation.pk,
            "flow_id": self.flow_id,
Andreas Klöckner's avatar
Andreas Klöckner committed
            action: ""
        }
        data.update(kwargs)
        return data

    def test_anonymous(self):
        with self.temporarily_switch_to_user(None):
            resp = self.get_grant_exception_view(
                force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

            resp = self.post_grant_exception_view(
                data=self.get_default_post_data(), force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

    def test_student(self):
        with self.temporarily_switch_to_user(self.student_participation.user):
            resp = self.get_grant_exception_view(
                force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

            resp = self.post_grant_exception_view(
                data=self.get_default_post_data(), force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

    def test_success(self):
        resp = self.get_grant_exception_view()
        self.assertEqual(resp.status_code, 200)

        resp = self.post_grant_exception_view(data=self.get_default_post_data())
        self.assertFormErrorLoose(resp, None)
        self.assertRedirects(
            resp, self.get_grant_exception_stage_2_url(),
            fetch_redirect_response=False)

    def test_form_invalid(self):
        with mock.patch(
                "course.views.ExceptionStage1Form.is_valid") as mock_is_valid:
            mock_is_valid.return_value = False

            with self.temporarily_switch_to_user(self.instructor_participation.user):
                resp = self.post_grant_exception_view(
                    data=self.get_default_post_data())
                self.assertEqual(resp.status_code, 200)


class GrantExceptionStage2Test(GrantExceptionTestMixin, TestCase):
    # test views.grant_exception_stage_2
    def get_default_post_data(self, action="next", **kwargs):
        data = {
            "session": self.fs.pk,
Andreas Klöckner's avatar
Andreas Klöckner committed
            action: ""
        }
        data.update(kwargs)
        return data

    def test_anonymous(self):
        with self.temporarily_switch_to_user(None):
            resp = self.get_grant_exception_stage_2_view(
                force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

            resp = self.post_grant_exception_stage_2_view(
                data=self.get_default_post_data(), force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

    def test_student(self):
        with self.temporarily_switch_to_user(self.student_participation.user):
            resp = self.get_grant_exception_stage_2_view(
                force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

            resp = self.post_grant_exception_stage_2_view(
                data=self.get_default_post_data(), force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

    def test_flow_does_not_exist(self):
        with mock.patch("course.content.get_flow_desc") as mock_get_flow_desc:
            from django.core.exceptions import ObjectDoesNotExist
            mock_get_flow_desc.side_effect = ObjectDoesNotExist()

            resp = self.get_grant_exception_stage_2_view()
            self.assertEqual(resp.status_code, 404)

            resp = self.post_grant_exception_stage_2_view(
                data=self.get_default_post_data())
            self.assertEqual(resp.status_code, 404)

    def test_flow_desc_has_no_rule(self):
        hacked_flow_desc = self.get_hacked_flow_desc(del_rules=True)
        with mock.patch("course.content.get_flow_desc") as mock_get_flow_desc:
            mock_get_flow_desc.return_value = hacked_flow_desc
            resp = self.get_grant_exception_stage_2_view()
            self.assertEqual(resp.status_code, 200)

            resp = self.post_grant_exception_stage_2_view(
                data=self.get_default_post_data())
            self.assertFormErrorLoose(resp, None)
            self.assertRedirects(
                resp, self.get_grant_exception_stage_3_url(session_id=self.fs.pk),
                fetch_redirect_response=False)

    def test_post_next_success(self):
        resp = self.get_grant_exception_stage_2_view()
        self.assertEqual(resp.status_code, 200)

        resp = self.post_grant_exception_stage_2_view(
            data=self.get_default_post_data())
        self.assertFormErrorLoose(resp, None)
        self.assertRedirects(
            resp, self.get_grant_exception_stage_3_url(session_id=self.fs.pk),
            fetch_redirect_response=False)

    def test_post_create_session_success(self):
        resp = self.get_grant_exception_stage_2_view()
        self.assertEqual(resp.status_code, 200)

        resp = self.post_grant_exception_stage_2_view(
            data=self.get_default_post_data(
                action="create_session",
                access_rules_tag_for_new_session=[views.NONE_SESSION_TAG]
            ))
        self.assertFormErrorLoose(resp, None)
        self.assertEqual(resp.status_code, 200)
        all_fs = models.FlowSession.objects.all()
        self.assertEqual(all_fs.count(), 2)
        # no access_rules_tag is save
        self.assertEqual(all_fs.filter(access_rules_tag__isnull=True).count(), 2)

    def test_start_rule_not_may_start_new_session(self):
        session_start_rule = self.get_hacked_session_start_rule(
            may_start_new_session=False)
        with mock.patch("course.utils.get_session_start_rule") as mock_get_nrule:
            mock_get_nrule.return_value = session_start_rule

            resp = self.get_grant_exception_stage_2_view()
            self.assertEqual(resp.status_code, 200)
            self.assertContains(
                resp,
                "Creating a new session is (technically) not allowed "
                "by course rules. Clicking 'Create Session' anyway will "
                "override this rule.")
            create_session_form = resp.context["forms"][1]
            names, _ = self.get_form_submit_inputs(create_session_form)
            self.assertIn("create_session", names)

            all_fs = models.FlowSession.objects.all()
            self.assertEqual(all_fs.count(), 1)
            resp = self.post_grant_exception_stage_2_view(
                data=self.get_default_post_data(
                    action="create_session",
                    access_rules_tag_for_new_session=[views.NONE_SESSION_TAG]))
            self.assertEqual(resp.status_code, 200)
            all_fs = models.FlowSession.objects.all()
            self.assertEqual(all_fs.count(), 2)
            self.assertEqual(all_fs.filter(access_rules_tag__isnull=False).count(),
                             0)
            self.assertAddMessageCallCount(1)
            self.assertAddMessageCalledWith(
                "A new session was created for '%(participation)s' "
                "for '%(flow_id)s'."
                % {"participation": self.student_participation,
                   "flow_id": self.flow_id})

    def test_exist_session_has_tags(self):
        another_fs_tag = "my_tag1"
        another_fs = factories.FlowSessionFactory(
            course=self.course, participation=self.student_participation,
            flow_id=self.flow_id, access_rules_tag=another_fs_tag)

        resp = self.get_grant_exception_stage_2_view()
        self.assertEqual(resp.status_code, 200)
        exception_form = resp.context["forms"][0]
        choices = exception_form.fields["session"].choices

        # stringified session name, the first is not tagged, the second is tagged
        self.assertNotIn("tagged", choices[0][1])
        self.assertIn("tagged '%s'" % another_fs_tag, choices[1][1])

        resp = self.post_grant_exception_stage_2_view(
            data=self.get_default_post_data(session=another_fs.pk))
        self.assertFormErrorLoose(resp, None)
        self.assertRedirects(
            resp, self.get_grant_exception_stage_3_url(session_id=another_fs.pk),
            fetch_redirect_response=False)
        self.assertAddMessageCallCount(0)

    def test_start_rule_has_tag_session(self):
        flow_desc_access_rule_tags = ["fdesc_tag1", "fdesc_tag2"]
        hacked_flow_desc = self.get_hacked_flow_desc_with_access_rule_tags(
            flow_desc_access_rule_tags)

        tag_session = "my_tag2"
        session_start_rule = self.get_hacked_session_start_rule(
            tag_session=tag_session)
        with mock.patch(
                "course.content.get_flow_desc") as mock_get_flow_desc, mock.patch(
                "course.utils.get_session_start_rule") as mock_get_nrule:
            mock_get_flow_desc.return_value = hacked_flow_desc
            mock_get_nrule.return_value = session_start_rule

            resp = self.get_grant_exception_stage_2_view()
            self.assertEqual(resp.status_code, 200)

            # because may start new session
            self.assertNotContains(
                resp,
                "Creating a new session is (technically) not allowed")

            create_session_form = resp.context["forms"][1]
            field = create_session_form.fields["access_rules_tag_for_new_session"]
            self.assertEqual(len(field.choices), 4)

            self.assertEqual(field.initial, tag_session)

            self.assertSetEqual(
                set(flow_desc_access_rule_tags
                    + [tag_session, views.NONE_SESSION_TAG]),
                set(dict(field.choices).keys()))

            resp = self.post_grant_exception_stage_2_view(
                data=self.get_default_post_data(
                    action="create_session",
                    access_rules_tag_for_new_session=tag_session))
            self.assertEqual(resp.status_code, 200)
            all_fs = models.FlowSession.objects.all()
            self.assertEqual(all_fs.count(), 2)
            self.assertEqual(all_fs.filter(access_rules_tag=tag_session).count(), 1)
            self.assertAddMessageCallCount(1)
            self.assertAddMessageCalledWith(
                "A new session tagged '%(tag)s' was created for "
                "'%(participation)s' for '%(flow_id)s'."
                % {"tag": tag_session,
                   "participation": self.student_participation,
                   "flow_id": self.flow_id})

    def test_start_rule_has_tag_session_with_in_flow_desc_arule_tags(self):
        flow_desc_access_rule_tags = ["my_tag1", "my_tag2"]
        hacked_flow_desc = self.get_hacked_flow_desc_with_access_rule_tags(
            flow_desc_access_rule_tags)

        tag_session = "my_tag2"
        session_start_rule = self.get_hacked_session_start_rule(
            tag_session=tag_session)
        with mock.patch(
                "course.content.get_flow_desc") as mock_get_flow_desc, mock.patch(
                "course.utils.get_session_start_rule") as mock_get_nrule:
            mock_get_flow_desc.return_value = hacked_flow_desc
            mock_get_nrule.return_value = session_start_rule

            resp = self.get_grant_exception_stage_2_view()
            self.assertEqual(resp.status_code, 200)

            # because may start new session
            self.assertNotContains(
                resp,
                "Creating a new session is (technically) not allowed")

            create_session_form = resp.context["forms"][1]
            field = create_session_form.fields["access_rules_tag_for_new_session"]
            self.assertEqual(len(field.choices), 3)

            self.assertEqual(field.initial, tag_session)

            self.assertSetEqual(
                set(flow_desc_access_rule_tags + [views.NONE_SESSION_TAG]),
                set(dict(field.choices).keys()))

            resp = self.post_grant_exception_stage_2_view(
                data=self.get_default_post_data(
                    action="create_session",
                    access_rules_tag_for_new_session=flow_desc_access_rule_tags[0]))
            self.assertEqual(resp.status_code, 200)
            all_fs = models.FlowSession.objects.all()
            self.assertEqual(all_fs.count(), 2)
            self.assertEqual(all_fs.filter(
                access_rules_tag=flow_desc_access_rule_tags[0]).count(), 1)

    def test_form_invalid(self):
        with mock.patch(
                "course.views.ExceptionStage2Form.is_valid") as mock_is_valid:
            mock_is_valid.return_value = False

            resp = self.post_grant_exception_stage_2_view(
                data=self.get_default_post_data())
            self.assertEqual(resp.status_code, 200)

    def test_invalid_operation(self):
        resp = self.post_grant_exception_stage_2_view(
            data=self.get_default_post_data(action="unknown"))
        self.assertEqual(resp.status_code, 400)


class GrantExceptionStage3Test(GrantExceptionTestMixin, TestCase):
    # test views.grant_exception_stage_2

    def setUp(self):
        fake_validate_session_access_rule = mock.patch(
            "course.validation.validate_session_access_rule")
        self.mock_validate_session_access_rule = (
            fake_validate_session_access_rule.start())
        self.addCleanup(fake_validate_session_access_rule.stop)
        fake_validate_session_grading_rule = mock.patch(
            "course.validation.validate_session_grading_rule")
        self.mock_validate_session_grading_rule = (
            fake_validate_session_grading_rule.start())
        self.addCleanup(fake_validate_session_grading_rule.stop)

    def get_default_post_data(self, action="submit", **kwargs):
        data = {
            "session": self.fs.pk,
Andreas Klöckner's avatar
Andreas Klöckner committed
            action: "",
            "comment": "my_comment"
        }
        data.update(kwargs)
        return data

    def test_anonymous(self):
        with self.temporarily_switch_to_user(None):
            resp = self.get_grant_exception_stage_3_view(
                force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

            resp = self.post_grant_exception_stage_3_view(
                data=self.get_default_post_data(), force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

    def test_student(self):
        with self.temporarily_switch_to_user(self.student_participation.user):
            resp = self.get_grant_exception_stage_3_view(
                force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

            resp = self.post_grant_exception_stage_3_view(
                data=self.get_default_post_data(), force_login_instructor=False)
            self.assertEqual(resp.status_code, 403)

    def test_flow_does_not_exist(self):
        with mock.patch("course.content.get_flow_desc") as mock_get_flow_desc:
            from django.core.exceptions import ObjectDoesNotExist
            mock_get_flow_desc.side_effect = ObjectDoesNotExist()

            resp = self.get_grant_exception_stage_3_view()
            self.assertEqual(resp.status_code, 404)

            resp = self.post_grant_exception_stage_3_view(
                data=self.get_default_post_data())
            self.assertEqual(resp.status_code, 404)

    def test_success(self):
        resp = self.get_grant_exception_stage_3_view()
        self.assertEqual(resp.status_code, 200)
        form = resp.context["form"]

        # no tags in flow_desc.rule
        self.assertNotIn("set_access_rules_tag", form.fields)
        self.assertNotIn("restrict_to_same_tag", form.fields)

        resp = self.post_grant_exception_stage_3_view(
            data=self.get_default_post_data())
        self.assertFormErrorLoose(resp, None)
        self.assertRedirects(
            resp, self.get_grant_exception_url(),
            fetch_redirect_response=False)

        # The above doesn't create a FlowRuleException object
        self.assertEqual(models.FlowRuleException.objects.count(), 0)
        self.assertAddMessageCallCount(1)
        self.assertAddMessageCalledWith(
            "No exception granted to the given flow session")

    def test_flow_desc_has_no_rule(self):
        hacked_flow_desc = self.get_hacked_flow_desc(del_rules=True)
        with mock.patch("course.content.get_flow_desc") as mock_get_flow_desc:
            mock_get_flow_desc.return_value = hacked_flow_desc
            resp = self.get_grant_exception_stage_3_view()
            self.assertEqual(resp.status_code, 200)

            form = resp.context["form"]

            # no flow_desc.rule
            self.assertNotIn("set_access_rules_tag", form.fields)
            self.assertNotIn("restrict_to_same_tag", form.fields)

            resp = self.post_grant_exception_stage_3_view(
                data=self.get_default_post_data())
            self.assertFormErrorLoose(resp, None)
            self.assertRedirects(
                resp, self.get_grant_exception_url(),
                fetch_redirect_response=False)

            # The above doesn't create a FlowRuleException object
            self.assertEqual(models.FlowRuleException.objects.count(), 0)
        self.assertAddMessageCallCount(1)
        self.assertAddMessageCalledWith(
            "No exception granted to the given flow session")

    def test_flow_desc_rule_has_tags(self):
        flow_desc_access_rule_tags = ["fdesc_tag1", "fdesc_tag2"]
        hacked_flow_desc = self.get_hacked_flow_desc_with_access_rule_tags(
            flow_desc_access_rule_tags)

        fs_with_flow_desc_tag = factories.FlowSessionFactory(
            course=self.course, participation=self.student_participation,
            flow_id=self.flow_id, access_rules_tag=flow_desc_access_rule_tags[0])

        another_fs_tag = "my_tag1"
        another_fs = factories.FlowSessionFactory(
            course=self.course, participation=self.student_participation,
            flow_id=self.flow_id, access_rules_tag=another_fs_tag)

        with mock.patch("course.content.get_flow_desc") as mock_get_flow_desc:
            mock_get_flow_desc.return_value = hacked_flow_desc
            resp = self.get_grant_exception_stage_3_view()
            self.assertEqual(resp.status_code, 200)

            form = resp.context["form"]
            self.assertIn("set_access_rules_tag", form.fields)
            self.assertIn("restrict_to_same_tag", form.fields)

            self.assertEqual(
                len(form.fields["set_access_rules_tag"].choices), 3)
            # default to None tag
            self.assertEqual(
                form.fields["set_access_rules_tag"].initial, views.NONE_SESSION_TAG)

            # flow session with tags from flow_desc.rules
            resp = self.get_grant_exception_stage_3_view(
                session_id=fs_with_flow_desc_tag.pk)
            self.assertEqual(resp.status_code, 200)

            form = resp.context["form"]

            self.assertEqual(
                len(form.fields["set_access_rules_tag"].choices), 3)
            self.assertEqual(
                form.fields["set_access_rules_tag"].initial,
                fs_with_flow_desc_tag.access_rules_tag)

            # flow session with it's own tag
            resp = self.get_grant_exception_stage_3_view(session_id=another_fs.pk)
            self.assertEqual(resp.status_code, 200)

            form = resp.context["form"]

            self.assertEqual(
                len(form.fields["set_access_rules_tag"].choices), 4)
            self.assertEqual(
                form.fields["set_access_rules_tag"].initial, another_fs_tag)

    # {{{ create_access_exception
    def test_create_access_exception_not_restrict_to_same_tag(self):
        flow_desc_access_rule_tags = ["fdesc_tag1", "fdesc_tag2"]
        hacked_flow_desc = self.get_hacked_flow_desc_with_access_rule_tags(
            flow_desc_access_rule_tags)

        with mock.patch("course.content.get_flow_desc") as mock_get_flow_desc:
            mock_get_flow_desc.return_value = hacked_flow_desc

            # not restrict_to_same_tag
            resp = self.post_grant_exception_stage_3_view(
                data=self.get_default_post_data(
                    create_access_exception=True,
                    set_access_rules_tag=[flow_desc_access_rule_tags[1]]
                ))
            self.assertFormErrorLoose(resp, None)
            self.assertRedirects(
                resp, self.get_grant_exception_url(),
                fetch_redirect_response=False)

            self.assertEqual(models.FlowRuleException.objects.count(), 1)
            self.assertEqual(
                models.FlowRuleException.objects.filter(
                    kind=constants.flow_rule_kind.access).count(), 1)
            self.assertAddMessageCallCount(2)
            self.assertAddMessageCalledWith(
                ["Access rules tag of the selected session updated "
                 "to '%s'." % flow_desc_access_rule_tags[1],
                 "'Session Access' exception granted to "], reset=True)
            self.fs.refresh_from_db()
            self.assertEqual(self.fs.access_rules_tag, flow_desc_access_rule_tags[1])
            self.assertEqual(self.mock_validate_session_access_rule.call_count, 1)
            self.mock_validate_session_access_rule.reset_mock()
            exc_rule = models.FlowRuleException.objects.last().rule

            self.assertIsNone(exc_rule.get("if_has_tag"),
                              msg="if_has_tag should not be created in this "
                                  "exception rule")

            # still not restrict_to_same_tag, but with NONE_SESSION_TAG
            resp = self.post_grant_exception_stage_3_view(
                data=self.get_default_post_data(
                    create_access_exception=True,
                    set_access_rules_tag=[views.NONE_SESSION_TAG]
                ))
            self.assertFormErrorLoose(resp, None)
            self.assertRedirects(
                resp, self.get_grant_exception_url(),
                fetch_redirect_response=False)

            self.assertEqual(models.FlowRuleException.objects.count(), 2)
            self.assertEqual(
                models.FlowRuleException.objects.filter(
                    kind=constants.flow_rule_kind.access).count(), 2)
            self.assertAddMessageCallCount(2)
            self.assertAddMessageCalledWith(
                ["Removed access rules tag of the selected session.",
                 "'Session Access' exception granted to "], reset=True)
            self.fs.refresh_from_db()
            self.assertEqual(self.fs.access_rules_tag, None)
            self.assertEqual(self.mock_validate_session_access_rule.call_count, 1)

            exc_rule = models.FlowRuleException.objects.last().rule
            self.assertIsNone(exc_rule.get("if_has_tag"),
                              msg="if_has_tag should not be created in this "
                                  "exception rule")

    def test_create_access_exception_restrict_to_same_tag(self):
        # restrict_to_same_tag, while session access_rules_tag is none
        flow_desc_access_rule_tags = ["fdesc_tag1", "fdesc_tag2"]
        hacked_flow_desc = self.get_hacked_flow_desc_with_access_rule_tags(
            flow_desc_access_rule_tags)

        with mock.patch("course.content.get_flow_desc") as mock_get_flow_desc:
            mock_get_flow_desc.return_value = hacked_flow_desc

            resp = self.post_grant_exception_stage_3_view(
                data=self.get_default_post_data(
                    create_access_exception=True,
                    set_access_rules_tag=[flow_desc_access_rule_tags[1]],
                    restrict_to_same_tag=True
                ))

            self.assertFormErrorLoose(resp, None)
            self.assertRedirects(
                resp, self.get_grant_exception_url(),
                fetch_redirect_response=False)

            self.assertEqual(models.FlowRuleException.objects.count(), 1)
            self.assertEqual(
                models.FlowRuleException.objects.filter(
                    kind=constants.flow_rule_kind.access).count(), 1)

            self.fs.refresh_from_db()
            self.assertEqual(self.fs.access_rules_tag, flow_desc_access_rule_tags[1])
            exc_rule = models.FlowRuleException.objects.last().rule
            self.assertEqual(exc_rule.get("if_has_tag"), None)

    def test_create_access_exception_restrict_to_same_tag2(self):
        flow_desc_access_rule_tags = ["fdesc_tag1", "fdesc_tag2"]
        hacked_flow_desc = self.get_hacked_flow_desc_with_access_rule_tags(
            flow_desc_access_rule_tags)

        # a flow session with it's own tag
        another_fs_tag = "my_tag1"
        another_fs = factories.FlowSessionFactory(
            course=self.course, participation=self.student_participation,
            flow_id=self.flow_id, access_rules_tag=another_fs_tag)

        with mock.patch("course.content.get_flow_desc") as mock_get_flow_desc:
            mock_get_flow_desc.return_value = hacked_flow_desc

            resp = self.post_grant_exception_stage_3_view(
                session_id=another_fs.pk,
                data=self.get_default_post_data(
                    session=another_fs.pk,
                    create_access_exception=True,
                    set_access_rules_tag=[flow_desc_access_rule_tags[1]],
                    restrict_to_same_tag=True  # then the above will be ignored
                ))

            self.assertFormErrorLoose(resp, None)
            self.assertRedirects(
                resp, self.get_grant_exception_url(),
                fetch_redirect_response=False)

            self.assertEqual(models.FlowRuleException.objects.count(), 1)
            self.assertEqual(
                models.FlowRuleException.objects.filter(
                    kind=constants.flow_rule_kind.access).count(), 1)

            self.assertAddMessageCallCount(1)
            self.assertAddMessageCalledWith(
                "'Session Access' exception granted to ")
            another_fs.refresh_from_db()
            self.assertEqual(another_fs.access_rules_tag, another_fs_tag)

            exc_rule = models.FlowRuleException.objects.last().rule
            self.assertEqual(exc_rule["if_has_tag"], another_fs_tag)

    def test_access_permissions_created(self):
        # ensure all flow permission is in the form, and will be
        # saved to the FlowRuleException permissions
        all_permissions = dict(constants.FLOW_PERMISSION_CHOICES).keys()

        from itertools import combinations
        from random import shuffle

        # 15 random flow permissions combination
        comb = list(combinations(all_permissions, 3))
        shuffle(comb)
        comb = comb[:15]

        for permissions in comb:
            with self.subTest(permissions=permissions):
                kwargs = {perm: True for perm in permissions}
                resp = self.post_grant_exception_stage_3_view(
                    data=self.get_default_post_data(
                        create_access_exception=True,
                        **kwargs))
                self.assertFormErrorLoose(resp, None)
                self.assertRedirects(
                    resp, self.get_grant_exception_url(),
                    fetch_redirect_response=False)
                exc_rule = models.FlowRuleException.objects.last().rule
                self.assertSetEqual(
                    set(exc_rule["permissions"]), set(permissions))

    def test_access_expires_created(self):
        expiration_time = as_local_time(now() + timedelta(days=3))
        resp = self.post_grant_exception_stage_3_view(
            data=self.get_default_post_data(
                create_access_exception=True,
                access_expires=expiration_time.strftime(
                    DATE_TIME_PICKER_TIME_FORMAT)))
        self.assertFormErrorLoose(resp, None)
        self.assertRedirects(
            resp, self.get_grant_exception_url(),
            fetch_redirect_response=False)
        expiration = models.FlowRuleException.objects.last().expiration
        self.assertIsNotNone(expiration)
        self.assertEqual(as_local_time(expiration).date(), expiration_time.date())
        self.assertEqual(as_local_time(expiration).hour, expiration_time.hour)
        self.assertEqual(as_local_time(expiration).minute, expiration_time.minute)

    # }}}

    def test_no_exception_created_but_updated_session_access_rule_tag(self):
        flow_desc_access_rule_tags = ["fdesc_tag1", "fdesc_tag2"]
        hacked_flow_desc = self.get_hacked_flow_desc_with_access_rule_tags(
            flow_desc_access_rule_tags)

        with mock.patch("course.content.get_flow_desc") as mock_get_flow_desc:
            mock_get_flow_desc.return_value = hacked_flow_desc

            # not restrict_to_same_tag
            resp = self.post_grant_exception_stage_3_view(
                data=self.get_default_post_data(
                    set_access_rules_tag=[flow_desc_access_rule_tags[1]]
                ))
            self.assertFormErrorLoose(resp, None)
            self.assertRedirects(
                resp, self.get_grant_exception_url(),
                fetch_redirect_response=False)

            self.assertEqual(models.FlowRuleException.objects.count(), 0)
            self.assertAddMessageCallCount(2)
            self.assertAddMessageCalledWith(
                ["Access rules tag of the selected session updated "
                 "to '%s'." % flow_desc_access_rule_tags[1],
                 "No other exception granted to "], reset=True)
            self.fs.refresh_from_db()
            self.assertEqual(self.fs.access_rules_tag, flow_desc_access_rule_tags[1])

    # {{{ create_grading_exception

    def test_create_grading_exception_due_none(self):
        resp = self.post_grant_exception_stage_3_view(
            data=self.get_default_post_data(
                create_grading_exception=True))

        self.assertFormErrorLoose(resp, None)
        self.assertRedirects(
            resp, self.get_grant_exception_url(),
            fetch_redirect_response=False)

        self.assertEqual(self.mock_validate_session_grading_rule.call_count, 1)

        excs = models.FlowRuleException.objects.all()
        self.assertEqual(excs.count(), 1)
        self.assertEqual(
            excs.filter(
                kind=constants.flow_rule_kind.grading).count(), 1)
        if_completed_before = excs[0].rule.get("if_completed_before")
        self.assertIsNone(if_completed_before)

        self.assertAddMessageCallCount(1)
        self.assertAddMessageCalledWith("'Grading' exception granted to ")

    def test_create_grading_exception_due(self):
        due = as_local_time(now() + timedelta(days=5))
        resp = self.post_grant_exception_stage_3_view(
            data=self.get_default_post_data(
                create_grading_exception=True,
                due=due.strftime(DATE_TIME_PICKER_TIME_FORMAT)))

        self.assertFormErrorLoose(resp, None)
        self.assertRedirects(
            resp, self.get_grant_exception_url(),
            fetch_redirect_response=False)

        self.assertEqual(self.mock_validate_session_grading_rule.call_count, 1)

        excs = models.FlowRuleException.objects.all()
        self.assertEqual(excs.count(), 1)
        self.assertEqual(
            excs.filter(
                kind=constants.flow_rule_kind.grading).count(), 1)
        if_completed_before = excs[0].rule.get("if_completed_before")
        self.assertAddMessageCalledWith("'Grading' exception granted to ")

    def test_create_grading_exception_due_same_as_access_expiration(self):
        due = as_local_time(now() + timedelta(days=5))
        expiration_time = as_local_time(now() + timedelta(days=3))
        resp = self.post_grant_exception_stage_3_view(
            data=self.get_default_post_data(
                create_grading_exception=True,
                due_same_as_access_expiration=True,
                due=due.strftime(DATE_TIME_PICKER_TIME_FORMAT),
                access_expires=expiration_time.strftime(
                    DATE_TIME_PICKER_TIME_FORMAT)))

        self.assertFormErrorLoose(resp, None)
        self.assertRedirects(
            resp, self.get_grant_exception_url(),
            fetch_redirect_response=False)

        self.assertEqual(self.mock_validate_session_grading_rule.call_count, 1)

        excs = models.FlowRuleException.objects.all()
        self.assertEqual(excs.count(), 1)
        self.assertEqual(
            excs.filter(
                kind=constants.flow_rule_kind.grading).count(), 1)
        if_completed_before = excs[0].rule.get("if_completed_before")
        self.assertAddMessageCalledWith("'Grading' exception granted to ")

    def test_create_grading_exception_due_same_as_access_expiration_while_expiration_not_set(self):  # noqa
        due = as_local_time(now() + timedelta(days=5))
        resp = self.post_grant_exception_stage_3_view(
            data=self.get_default_post_data(
                create_grading_exception=True,
                due_same_as_access_expiration=True,
                due=due.strftime(DATE_TIME_PICKER_TIME_FORMAT)))

        self.assertEqual(resp.status_code, 200)
        self.assertFormErrorLoose(
            resp, "Must specify access expiration if 'due same "
                  "as access expiration' is set.")

        self.assertEqual(self.mock_validate_session_grading_rule.call_count, 0)

    def test_create_grading_exception_credit_percent_recorded_in_description(self):
        resp = self.post_grant_exception_stage_3_view(
            data=self.get_default_post_data(
                create_grading_exception=True,
                credit_percent=89.1))

        self.assertFormErrorLoose(resp, None)
        self.assertRedirects(resp, self.get_grant_exception_url(),
                             fetch_redirect_response=False)

        self.assertEqual(self.mock_validate_session_grading_rule.call_count, 1)

        excs = models.FlowRuleException.objects.all()
        self.assertEqual(excs.count(), 1)
        self.assertEqual(
            excs.filter(
                kind=constants.flow_rule_kind.grading).count(), 1)
        description = excs[0].rule.get("description")
        self.assertIsNotNone(description)
        self.assertIn("89.1%", description)

        self.assertAddMessageCallCount(1)
        self.assertAddMessageCalledWith("'Grading' exception granted to ")

    def test_grading_rule_generates_grade(self):
        # not generates_grade
        self.post_grant_exception_stage_3_view(