Skip to content
test_versioning.py 47.5 KiB
Newer Older
from __future__ import division

__copyright__ = "Copyright (C) 2017 Dong Zhuang"

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

import six
import unittest
Dong Zhuang's avatar
Dong Zhuang committed
from django.test import TestCase, RequestFactory
from dulwich.contrib.paramiko_vendor import ParamikoSSHVendor
Dong Zhuang's avatar
Dong Zhuang committed
from relate.utils import force_remove_path

from course.models import Course, Participation
from course import versioning
Dong Zhuang's avatar
Dong Zhuang committed
from course.validation import ValidationWarning
from course.constants import participation_permission as pperm
Dong Zhuang's avatar
Dong Zhuang committed
from tests.base_test_mixins import (
Dong Zhuang's avatar
Dong Zhuang committed
    SingleCourseTestMixin,
    CoursesTestMixinBase, SINGLE_COURSE_SETUP_LIST,
    FallBackStorageMessageTestMixin)
Dong Zhuang's avatar
Dong Zhuang committed
from tests.utils import suppress_stdout_decorator, mock
from tests import factories

TEST_PUBLIC_KEY = """
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA7A1rTpbRpCek4tZZKa8QH14/pYzraN7hDnx3BKrqRxghP/0Q
uc98qeQkA5T3EYjHsConAAArLzbo6PMGwM9353dFixGUHegZe3jUmszX7G2veZx5
1xJ20pffbi8ohjv2Lj+nr799oGw7pGcEUXMr2v0b+UfToNFJAWGY3j9G+vTT7JEY
b9hq+XTOvoF+pLWbU7mIock5CxP3Q5NGS3/SjX2Qv50m6uc0dP34mGL4zDzgAMiC
99OpFms7CoO4AMWa/CfDCKl2vZHqDEF7Pd02vUP3ddb5pyGDeZmfhtK2LDLKi7ke
1pv1B4BwZ6HPmNep2CKEK6jf0+o5fIKm1uy3iQIDAQABAoIBAHCxl2FVr5BnPNju
7HJyGYhgPpKSzHCst1VrJocb8e0vH/CkqK+M1z9ko6zyGWJNosf/186wRe2skVVl
cPvsEJp43sKeCdCdVk0USqv8z7kYRIYSpjh/oCq6RvkbmoU7azR5P10wVpGYGoFK
jU01ZuKNpCVGnUpRoEEAjzLLkt+LyBjcey4ZUntp5L4h+ahm14Z8GLKbO2UCYZk6
nUnfigxiw/otXL+lN4+LjwjgTWS9JvG6Z+OQSf9b/G9DuDjpbvS5JQ0FONmUUH6m
nLwvJrePT50OPdB9mM4f+Ev03oRr9EXBXPL5t33SqRjhVn3dzH2wcuyhMUnZfGg0
MMksBAECgYEA/9VGBzaHYF2Va5OsNe3nvsOU9RI/n4/lN31MzDYwHDj2/NIDPR6S
7y42BxnGXce28t50ly/9ogut/yWfv0gyRbJ1tshl4/Lk4nqi9LwOOydL7XCd0+iM
ZiHfJafkItRNmVJjjX8OT0B0lfdmY/dJQJVH7zo6KzLlDNnzufZJSZECgYEA7DTX
kbGWrqtZl9wIvAdgfAlnagGpS/qUN8rB39TMHN5FpGH4xLV4BCyB6qacbF8YD3FB
14F4DVQh3/d6l0iybRBKFqxx3laUVeVYZNbvfaiCWvRJnnTZxI2ofQvK8nDhINiL
J4dpsRrxnTUrpoxg9xOejaLJHi9uPwOw0tjw0nkCgYBSAPvsbfcg1X6CuBgYRUTm
aezCTXIlZEt16O0H/EqZkUziJzMwkS9KCYb56bIi91RWLyYyHAjxu0qvoVC+UJcE
rjp7N2spkP769ZJsXic1oNf+qP1+Iml2h17uxA0leOXSwoz0mwhsMN3uABpK6sYJ
NJCVRxXEKREweGBeeGpvcQKBgAVkT2dr/lyOXMUyqKBiKrmqHUo2L38kgS2k2zgY
y2/9Qum1stAKtGqj+XM5ymhO42W22CHrOqpTOVK7e3jol+oVbRuHZDIHF+u+CH6E
yYK8zfz1hpivYikycp4oHsHaAcmWJ9cHKEp6qvlDtXNf0PbS49On259sxb96fhbS
DO1BAoGBAMvipNBiAbZXzNm6eTa6NMYcmiZpgV2zvx/Rbq+L2heOoBA7jyH41nl/
XHYymOvk4oiQKCfgal4+fqSpHSs6/S1MyLmHbONTLTwS8yHU9esi6ntEnse28ewc
yZ/atCI8q7Wyi7i2wr/okVekQYWdMSbnamp0A3zkYW6mhKpvibdN
-----END RSA PRIVATE KEY-----
"""
class VersioningTestMixin(CoursesTestMixinBase, FallBackStorageMessageTestMixin):
    courses_setup_list = []

    @classmethod
    def setUpTestData(cls):  # noqa
        super(VersioningTestMixin, cls).setUpTestData()
        cls.instructor = cls.create_user(
            SINGLE_COURSE_SETUP_LIST[0]["participations"][0]["user"])
        cls.add_user_permission(cls.instructor, "add_course")

    def setUp(self):
        super(VersioningTestMixin, self).setUp()
        self.rf = RequestFactory()
        self.addCleanup(self.force_remove_all_course_dir)

    def get_set_up_new_course_form_data(self):
        return deepcopy(SINGLE_COURSE_SETUP_LIST[0]["course"])


class CourseCreationTest(VersioningTestMixin, TestCase):
Dong Zhuang's avatar
Dong Zhuang committed
    def test_get_set_up_new_course_view(self):
        with self.temporarily_switch_to_user(self.instructor):
            resp = self.c.get(self.get_set_up_new_course_url(),
                              data=SINGLE_COURSE_SETUP_LIST[0]["course"])
            self.assertEqual(resp.status_code, 200)

    def test_non_auth_set_up_new_course(self):
        with self.temporarily_switch_to_user(None):
            resp = self.get_set_up_new_course()
            self.assertTrue(resp.status_code, 403)

            data = SINGLE_COURSE_SETUP_LIST[0]["course"]
            resp = self.post_create_course(data, raise_error=False,
                                           login_superuser=False)
            self.assertTrue(resp.status_code, 403)
            self.assertEqual(Course.objects.count(), 0)

Dong Zhuang's avatar
Dong Zhuang committed
    def test_post_set_up_new_course_form_not_valid(self):
        data = SINGLE_COURSE_SETUP_LIST[0]["course"].copy()
        del data["identifier"]
        resp = self.post_create_course(data, raise_error=False)
        self.assertTrue(resp.status_code, 200)
        self.assertEqual(Course.objects.count(), 0)
        self.assertFormErrorLoose(resp, "This field is required.")

    def test_set_up_new_course_no_perm(self):
        # create a user which has no perm for creating course
        ta = self.create_user(
            SINGLE_COURSE_SETUP_LIST[0]["participations"][1]["user"])
        self.assertFalse(ta.has_perm("course.add_course"))
        self.assertFalse(ta.has_perm("course.change_course"))
        self.assertFalse(ta.has_perm("course.delete_course"))

        with self.temporarily_switch_to_user(ta):
            resp = self.get_set_up_new_course()
            self.assertTrue(resp.status_code, 403)

            data = self.get_set_up_new_course_form_data()
            resp = self.post_create_course(data, raise_error=False,
                                           login_superuser=False)
            self.assertTrue(resp.status_code, 403)
            self.assertEqual(Course.objects.count(), 0)

    def test_set_up_new_course(self):
        # In this test, we use client instead of request factory to simplify
        # the logic.

        with self.temporarily_switch_to_user(self.instructor):
            # the permission is cached, need to repopulated from db
            resp = self.get_set_up_new_course()
            self.assertTrue(resp.status_code, 200)

            with mock.patch("dulwich.client.GitClient.fetch",
                            return_value={b"HEAD": b"some_commit_sha"}), \
                 mock.patch("course.versioning.transfer_remote_refs",
                            return_value=None), \
                 mock.patch('course.versioning.messages') as mock_messages, \
                    mock.patch("course.validation.validate_course_content",
                               return_value=None):
                data = self.get_set_up_new_course_form_data()

                resp = self.post_create_course(data, raise_error=False,
                                               login_superuser=False)
                self.assertTrue(resp.status_code, 200)
                self.assertEqual(Course.objects.count(), 1)
                self.assertEqual(Participation.objects.count(), 1)
                self.assertEqual(Participation.objects.first().user.username,
                                 "test_instructor")
                self.assertIn("Course content validated, creation succeeded.",
                              mock_messages.add_message.call_args[0])

                from course.enrollment import get_participation_role_identifiers

                # the user who setup the course has role instructor
                self.assertTrue(
                    get_participation_role_identifiers(
                        Course.objects.first(),
                        Participation.objects.first()),
                    "instructor")

    def test_set_up_new_course_form_invalid(self):
        for field_name in ["identifier", "name", "number", "time_period",
                           "git_source", "from_email", "notify_email"]:
            form_data = self.get_set_up_new_course_form_data()
            del form_data[field_name]
            request = self.rf.post(self.get_set_up_new_course_url(), data=form_data)
            request.user = self.instructor
            form = versioning.CourseCreationForm(request.POST)
            self.assertFalse(form.is_valid())

Dong Zhuang's avatar
Dong Zhuang committed
    @suppress_stdout_decorator(suppress_stderr=True)
    def test_set_up_new_course_error_with_no_repo_created(self):
        resp = self.get_set_up_new_course()
        self.assertTrue(resp.status_code, 403)

        data = SINGLE_COURSE_SETUP_LIST[0]["course"]

        with mock.patch(
                "course.versioning.Repo.init"
        ) as mock_repo_innit, mock.patch(
            'course.versioning.messages'
        ) as mock_messages:
            mock_repo_innit.side_effect = RuntimeError("Repo init error")
            resp = self.post_create_course(data, raise_error=False)
            self.assertTrue(resp.status_code, 200)
            self.assertEqual(Course.objects.count(), 0)

            self.assertIn("Course creation failed: RuntimeError: "
                          "Repo init error",
                          mock_messages.add_message.call_args[0][2])

    @suppress_stdout_decorator(suppress_stderr=True)
    def test_set_up_new_course_failed_to_delete_repo(self):
        resp = self.get_set_up_new_course()
        self.assertTrue(resp.status_code, 403)

        def force_remove_path_side_effect(path):
            # we need to delete the path, or tests followed will fail
            force_remove_path(path)
            raise OSError("my os error")

        data = SINGLE_COURSE_SETUP_LIST[0]["course"]

        with mock.patch(
                "dulwich.client.GitClient.fetch"
        ) as mock_fetch, mock.patch(
                'relate.utils.force_remove_path'
        )as mock_force_remove_path, mock.patch(
                'course.versioning.messages.add_message'
        ) as mock_add_message:
            mock_fetch.side_effect = RuntimeError("my fetch error")
            mock_force_remove_path.side_effect = force_remove_path_side_effect
            resp = self.post_create_course(data, raise_error=False)
            self.assertTrue(resp.status_code, 200)
            self.assertEqual(Course.objects.count(), 0)

            self.assertEqual(mock_add_message.call_count, 2)

            messages = []
            for call in mock_add_message.call_args_list:
                args, _ = call
                for arg in args:
                    if not isinstance(arg, mock.MagicMock):
                        messages.append(str(arg))
            joined_messages = ". ".join(messages)

            self.assertIn("Failed to delete unused "
                          "repository directory",
                          joined_messages)

            self.assertIn("Course creation failed: RuntimeError: "
                          "my fetch error",
                          joined_messages)

    @suppress_stdout_decorator(suppress_stderr=True)
    def test_set_up_new_course_git_source_invalid(self):
        data = self.get_set_up_new_course_form_data()
        request = self.rf.post(self.get_set_up_new_course_url(), data=data)
        request.user = self.instructor
        with mock.patch("dulwich.client.GitClient.fetch",
                        return_value=None), \
             mock.patch('course.versioning.messages') as mock_messages, \
                mock.patch("course.models.Course.save") as mock_save, \
                mock.patch("course.versioning.render"):
            resp = versioning.set_up_new_course(request)
            self.assertTrue(resp.status_code, 200)
            self.assertEqual(mock_save.call_count, 0)
            self.assertIn("No refs found in remote repository",
                          mock_messages.add_message.call_args[0][2])

    @suppress_stdout_decorator(suppress_stderr=True)
    def test_set_up_new_course_subdir(self):
        data = self.get_set_up_new_course_form_data()
        data["course_root_path"] = "some_dir"
        request = self.rf.post(self.get_set_up_new_course_url(), data=data)
        request.user = self.instructor
        with mock.patch("dulwich.client.GitClient.fetch",
                        return_value={b"HEAD": b"some_commit_sha"}), \
             mock.patch('course.versioning.messages'), \
             mock.patch("course.validation.validate_course_content",
                        return_value=None) as mock_validate, \
                mock.patch("course.models.Course.save"), \
                mock.patch("course.models.Participation.save", return_value=True), \
                mock.patch("course.versioning.render"):
            resp = versioning.set_up_new_course(request)
            from course.content import SubdirRepoWrapper
            self.assertIsInstance(mock_validate.call_args[0][0], SubdirRepoWrapper)
            self.assertTrue(resp.status_code, 200)
class ParamikoSSHVendorTest(unittest.TestCase):
    # A simple integration tests, making sure ParamikoSSHVendor is used
    # for ssh protocol.

    @classmethod
    def setUpClass(cls):  # noqa
        course = factories.CourseFactory.create(**cls.prepare_data())
        cls.git_client, _ = (
            versioning.get_dulwich_client_and_remote_path_from_course(course))
        cls.ssh_vendor = cls.git_client.ssh_vendor
        assert isinstance(cls.ssh_vendor, ParamikoSSHVendor)

    @classmethod
    def prepare_data(cls):
        data = deepcopy(SINGLE_COURSE_SETUP_LIST[0]["course"])
        data["identifier"] = "my-private-course"
        data["git_source"] = "git+ssh://foo.com:1234/bar/baz"
        data["ssh_private_key"] = TEST_PUBLIC_KEY
        return data

    def test_invalid(self):
        from paramiko.ssh_exception import AuthenticationException

        expected_error_msg = "Authentication failed"
        with self.assertRaises(AuthenticationException) as cm:
            # This is also used to ensure paramiko.client.MissingHostKeyPolicy
            # is added to the client
            self.ssh_vendor.run_command(
                host="github.com",
                command="git-upload-pack '/bar/baz'",
                username=None,
                port=None)
        self.assertIn(expected_error_msg, str(cm.exception))

        with self.assertRaises(AuthenticationException) as cm:
            self.ssh_vendor.run_command(
                host="github.com",
                command="git-upload-pack '/bar/baz'",
                username="me",
                port=22)
        self.assertIn(expected_error_msg, str(cm.exception))

        expected_error_msg = "Bad authentication type"

        with self.assertRaises(AuthenticationException) as cm:
            self.ssh_vendor.run_command(
                host="github.com",
                command="git-upload-pack '/bar/baz'",
                password="mypass")

        self.assertIn(expected_error_msg, str(cm.exception))

        if six.PY2:
            exception = IOError
        else:
            exception = FileNotFoundError
        with self.assertRaises(exception) as cm:
            self.ssh_vendor.run_command(
                host="github.com",
                command="git-upload-pack '/bar/baz'",
                key_filename="key_file")

        expected_error_msg = "No such file or directory: 'key_file'"
        self.assertIn(expected_error_msg, str(cm.exception))

        with self.assertRaises(AttributeError) as cm:
            self.ssh_vendor.run_command(
                host="github.com",
                command="git-upload-pack '/bar/baz'",
                pkey="invalid_key")

    @suppress_stdout_decorator(suppress_stderr=True)
    def test_set_up_ensure_get_transport_called(self):
        with mock.patch(
                "course.versioning.paramiko.SSHClient.connect"
        ) as mock_connect, mock.patch(
            "course.versioning.paramiko.SSHClient.get_transport"
        ) as mock_get_transport:
            mock_channel = mock.MagicMock()
            mock_get_transport.return_value.open_session.return_value = mock_channel
            mock_channel.exec_command = mock.MagicMock()
            mock_channel.recv.side_effect = [b"10"]
            mock_channel.recv_stderr.side_effect = [b"my custom error", "", ""]

            try:
                self.ssh_vendor.run_command(
                    host="github.com",
                    command="git-upload-pack '/bar/baz'")
            except StopIteration:
                pass

            self.assertEqual(mock_connect.call_count, 1)

            from paramiko.rsakey import RSAKey
            used_rsa_key = None

            # make sure rsa_key is used when connect
            for v in mock_connect.call_args[1].values():
                if isinstance(v, RSAKey):
                    used_rsa_key = v

            self.assertIsNotNone(used_rsa_key)

            # make sure get_transport is called
            self.assertEqual(mock_get_transport.call_count, 1)

            # make sure exec_command is called
            self.assertEqual(mock_channel.exec_command.call_count, 1)
            self.assertIn(
                "git-upload-pack '/bar/baz'",
                mock_channel.exec_command.call_args[0])


class TransferRemoteRefsTest(unittest.TestCase):
    # test versioning.transfer_remote_refs

    # Fixme: need better tests
    def test_remote_ref_none(self):
Dong Zhuang's avatar
Dong Zhuang committed
        repo_dict = {}
        repo_dict[b"refs/remotes/origin/1"] = b"some_bytes"
        repo_dict[b'HEAD'] = b"some_head"

        repo = mock.MagicMock()
Dong Zhuang's avatar
Dong Zhuang committed
        repo.__getitem__.side_effect = repo_dict.__getitem__

        repo.get_refs.return_value = {
            b"refs/remotes/origin/1": "some_text1",
            b"refs/remotes/other/1": "some_text2"}
        versioning.transfer_remote_refs(repo, None)
Dong Zhuang's avatar
Dong Zhuang committed
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000


class FakeCommit(object):
    def __init__(self, name, parents=None, id=None,
                 message=b"my commit message"):
        self.name = name
        self.parents = parents or []
        self.message = message
        self.id = id

    def __repr__(self):
        return "%s: %s" % self.__class__.__name__ + str(self.name)


class IsParentCommitTest(unittest.TestCase):
    def setUp(self):
        repo_dict = {}
        repo = mock.MagicMock()
        repo.__getitem__.side_effect = repo_dict.__getitem__
        repo.__setitem__.side_effect = repo_dict.__setitem__
        self.repo = repo

    def test_false(self):
        c0 = FakeCommit(b"head", None)
        self.repo[b"HEAD"] = c0
        self.repo[c0] = FakeCommit(b"none", None)

        c1 = FakeCommit(b"first", [c0])

        self.assertFalse(
            versioning.is_parent_commit(
                self.repo, potential_parent=c0, child=c1))

        self.assertFalse(
            versioning.is_parent_commit(
                self.repo, potential_parent=c0, child=c1,
                max_history_check_size=2))

    def test_true(self):
        c0 = FakeCommit(b"head", None)
        self.repo[b"HEAD"] = c0
        self.repo[c0] = c0

        c1 = FakeCommit(b"first", [c0])
        self.repo[c1] = FakeCommit(b"first_c", [c0])

        c2 = FakeCommit(b"second", [c1])
        self.repo[c2] = FakeCommit(b"second_c", [c1])

        c3 = FakeCommit(b"third", [c2])

        self.assertFalse(
            versioning.is_parent_commit(
                self.repo, potential_parent=c0, child=c3,
                max_history_check_size=1))

        self.assertTrue(
            versioning.is_parent_commit(
                self.repo, potential_parent=c0, child=c3,
                max_history_check_size=20))


FETCHED_LITERAL = "Fetch successful."
VALIDATE_SUCCESS_LITERAL = "Course content validated successfully."
PREVIEW_END_LITERAL = "Preview ended."
UPDATE_APPLIED_LITERAL = "Update applied."
WARNINGS_LITERAL = "Course content validated OK, with warnings:"
VALIDATE_FAILURE_LITERAL = "Course content did not validate successfully:"
NOT_UPDATED_LITERAL = "Update not applied."
FAILURE_MSG = "my validation error."
LOCATION1 = "location1"
LOCATION2 = "location2"
WARNING1 = "some waring1"
WARNING2 = "some waring2"


class RunCourseUpdateCommandTest(unittest.TestCase):
    # test versioning.run_course_update_command

    default_preview_sha = "preview_sha"
    default_old_sha = "old_sha"
    default_switch_to_sha = "switch_sha"
    default_lastest_sha = "latest_sha"

    def setUp(self):
        super(RunCourseUpdateCommandTest, self).setUp()
        self.course = factories.CourseFactory(
            active_git_commit_sha=self.default_old_sha)
        user = factories.UserFactory()
        instructor_role = factories.ParticipationRoleFactory(
            course=self.course,
            identifier="instructor"
        )

        self.participation = factories.ParticipationFactory(
            course=self.course,
            preview_git_commit_sha=None,
            user=user)
        self.participation.roles.set([instructor_role])

        self.request = mock.MagicMock()
        self.request.user = user

        self.pctx = mock.MagicMock()
        self.pctx.course = self.course
        self.pctx.participation = self.participation

        self.repo = mock.MagicMock()
        self.content_repo = self.repo

        fake_get_dulwich_client_and_remote_path_from_course = mock.patch(
            "course.versioning.get_dulwich_client_and_remote_path_from_course")
        self.mock_get_dulwich_client_and_remote_path_from_course = (
            fake_get_dulwich_client_and_remote_path_from_course.start()
        )

        self.mock_client = mock.MagicMock()
        remote_path = "/remote/path"
        self.mock_get_dulwich_client_and_remote_path_from_course.return_value = (
            self.mock_client, remote_path
        )
        self.mock_client.fetch.return_value = {
            b"HEAD": self.default_switch_to_sha.encode()}

        self.addCleanup(fake_get_dulwich_client_and_remote_path_from_course.stop)

        fake_transfer_remote_refs = mock.patch(
            "course.versioning.transfer_remote_refs")
        self.mock_transfer_remote_refs = fake_transfer_remote_refs.start()
        self.addCleanup(fake_transfer_remote_refs.stop)

        fake_is_parent_commit = mock.patch("course.versioning.is_parent_commit")
        self.mock_is_parent_commit = fake_is_parent_commit.start()
        self.mock_is_parent_commit.return_value = False
        self.addCleanup(fake_is_parent_commit.stop)

        fake_add_message = mock.patch('course.versioning.messages.add_message')
        self.mock_add_message = fake_add_message.start()
        self.addCleanup(fake_add_message.stop)

        fake_validate_course_content = mock.patch(
            "course.validation.validate_course_content")
        self.mock_validate_course_content = fake_validate_course_content.start()
        self.mock_validate_course_content.return_value = []
        self.addCleanup(fake_validate_course_content.stop)

    def tearDown(self):
        for course in Course.objects.all():
            course.delete()

    @unittest.skipIf(six.PY2, "PY2 doesn't support subTest")
    def test_is_parent_commit_checked(self):
        may_update = True
        prevent_discarding_revisions = True

        command_tup = [
            ("fetch", True),
            ("fetch_update", True),
            ("update", False),
            ("fetch_preview", True),
            ("preview", False),
            ("end_preview", False)]

        for command, will_check in command_tup:
            self.mock_is_parent_commit.reset_mock()
            with self.subTest(
                    command=command,
                    prevent_discarding_revisions=prevent_discarding_revisions):
                versioning.run_course_update_command(
                    self.request, self.repo, self.content_repo, self.pctx, command,
                    self.default_switch_to_sha.encode(), may_update,
                    prevent_discarding_revisions)
                if will_check and self.mock_is_parent_commit.call_count != 1:
                    self.fail(
                        "'is_parent_commit' is expected for command '%s' to be "
                        "called while not" % command)
                elif not will_check and self.mock_is_parent_commit.call_count > 0:
                    self.fail(
                        "'is_parent_commit' is not expected for command '%s' to be "
                        "called while called" % command)

        # when not prevent_discarding_revisions, is_parent_commit
        # should not be checked (expensive operation)

        prevent_discarding_revisions = False
        for command, _ in command_tup:
            self.mock_is_parent_commit.reset_mock()
            with self.subTest(
                    command=command,
                    prevent_discarding_revisions=prevent_discarding_revisions):
                versioning.run_course_update_command(
                    self.request, self.repo, self.content_repo, self.pctx, command,
                    self.default_switch_to_sha.encode(),
                    may_update, prevent_discarding_revisions)
                if self.mock_is_parent_commit.call_count > 0:
                    self.fail(
                        "'is_parent_commit' is not expected for command '%s' to be "
                        "called while called (expensive)" % command)
                elif self.mock_is_parent_commit.call_count > 0:
                    self.fail(
                        "'is_parent_commit' is not expected for command '%s' to be "
                        "called while called" % command)

    @unittest.skipIf(six.PY2, "PY2 doesn't support subTest")
    def test_is_content_validated(self):
        may_update = True

        command_tup = [
            ("fetch", False),
            ("fetch_update", True),
            ("update", True),
            ("fetch_preview", True),
            ("preview", True),
            ("end_preview", False)]

        for command, will_validate in command_tup:
            self.mock_validate_course_content.reset_mock()
            with self.subTest(command=command):
                versioning.run_course_update_command(
                    self.request, self.repo, self.content_repo, self.pctx, command,
                    self.default_switch_to_sha.encode(),
                    may_update, prevent_discarding_revisions=False)
                if (will_validate
                        and self.mock_validate_course_content.call_count != 1):
                    self.fail(
                        "'validate_course_content' is expected for "
                        "command '%s' to be called while not" % command)
                elif (not will_validate
                      and self.mock_validate_course_content.call_count > 0):
                    self.fail(
                        "'validate_course_content' is not expected for "
                        "command '%s' to be called while called" % command)

    def test_unknown_command(self):
        command = "unknown"
        may_update = True
        prevent_discarding_revisions = False

        with self.assertRaises(RuntimeError) as cm:
            versioning.run_course_update_command(
                self.request, self.repo, self.content_repo, self.pctx, command,
                self.default_switch_to_sha.encode(),
                may_update, prevent_discarding_revisions)

        self.assertEqual(self.mock_validate_course_content.call_count, 0)
        self.assertEqual(self.mock_is_parent_commit.call_count, 0)

        expected_error_msg = "invalid command"
        self.assertIn(expected_error_msg, str(cm.exception))

    def check_command_message_result(
            self, command, add_message_expected_call_count=0,
            expected_add_message_literals=None,
            not_expected_add_message_literals=None,
            is_previewing=False,
            expected_error_type=None,
            expected_error_msg=None,
            **call_kwargs
    ):
        kwargs = {
            "request": self.request,
            "repo": self.repo,
            "content_repo": self.content_repo,
            "pctx": self.pctx,
            "command": command,
            "new_sha": self.default_switch_to_sha.encode(),
            "may_update": True,
            "prevent_discarding_revisions": True
        }
        kwargs.update(call_kwargs)

        if expected_add_message_literals is None:
            expected_add_message_literals = []
        else:
            assert isinstance(expected_add_message_literals, list)

        if not_expected_add_message_literals is None:
            not_expected_add_message_literals = []
        else:
            assert isinstance(not_expected_add_message_literals, list)

        if is_previewing:
            self.participation.preview_git_commit_sha = self.default_preview_sha
            self.participation.save()

        if expected_error_type:
            assert expected_error_msg is not None
            with self.assertRaises(expected_error_type) as cm:
                versioning.run_course_update_command(**kwargs)

            self.assertIn(expected_error_msg, str(cm.exception))
        else:
            versioning.run_course_update_command(**kwargs)

        self.assertEqual(
            self.mock_add_message.call_count, add_message_expected_call_count,
            "add_message is unexpectedly called with %s"
            % "; ".join(str(call) for call in self.mock_add_message.call_args_list)
        )

        if add_message_expected_call_count > 0:
            messages = []
            for call in self.mock_add_message.call_args_list:
                args, _ = call
                for arg in args:
                    if not isinstance(arg, mock.MagicMock):
                        messages.append(str(arg))
            joined_messages = ". ".join(messages)

            unexpected_not_presented = []
            unexpected_presented = []
            for l in expected_add_message_literals:
                if l not in joined_messages:
                    unexpected_not_presented.append(l)
            for l in not_expected_add_message_literals:
                if l in joined_messages:
                    unexpected_presented.append(l)

            error_msg = ""
            if unexpected_not_presented:
                error_msg += ("'%s' are expected to be added in messages "
                              "while not."
                              % ", ".join(unexpected_not_presented))
            if unexpected_presented:
                error_msg += ("'%s' are not expected to be added in messages "
                              "while added."
                              % ", ".join(unexpected_presented))

            if error_msg:
                self.fail(error_msg)

    def test_fetch(self):
        self.check_command_message_result(
            command="fetch",
            add_message_expected_call_count=1,
            expected_add_message_literals=[
                FETCHED_LITERAL
            ],
            not_expected_add_message_literals=[PREVIEW_END_LITERAL]
        )

    def test_end_preview(self):
        self.check_command_message_result(
            command="end_preview",
            add_message_expected_call_count=1,
            is_previewing=True,
            expected_add_message_literals=[
                PREVIEW_END_LITERAL
            ]
        )
        self.assertIsNone(self.participation.preview_git_commit_sha)

    @unittest.skipIf(six.PY2, "PY2 doesn't support subTest")
    def test_fetch_not_prevent_discarding_revisions(self):
        self.mock_client.fetch.return_value = {
            b"HEAD": self.default_lastest_sha.encode()}

        command_tup = (
            ("fetch", 1, [FETCHED_LITERAL], [UPDATE_APPLIED_LITERAL],
             self.default_old_sha),
            ("fetch_update", 3, [FETCHED_LITERAL, UPDATE_APPLIED_LITERAL,
                                 VALIDATE_SUCCESS_LITERAL], [],
             self.default_lastest_sha)
        )

        for (command, add_message_call_count, expected, not_expected,
             expected_course_sha) in command_tup:
            with self.subTest(command=command):
                self.mock_is_parent_commit.return_value = False
                self.check_command_message_result(
                    command=command,
                    add_message_expected_call_count=add_message_call_count,
                    expected_add_message_literals=expected,
                    not_expected_add_message_literals=not_expected,
                    prevent_discarding_revisions=False
                )

                self.mock_add_message.reset_mock()

                self.mock_is_parent_commit.return_value = True
                self.check_command_message_result(
                    command=command,
                    add_message_expected_call_count=add_message_call_count,
                    expected_add_message_literals=expected,
                    not_expected_add_message_literals=not_expected,
                    prevent_discarding_revisions=False
                )

                self.assertEqual(
                    self.course.active_git_commit_sha, expected_course_sha)

                self.mock_add_message.reset_mock()

    def test_fetch_prevent_discarding_revisions(self):
        self.mock_is_parent_commit.return_value = True
        self.check_command_message_result(
            command="fetch",
            expected_error_type=RuntimeError,
            expected_error_msg="fetch would discard commits, refusing",
            add_message_expected_call_count=0,
            prevent_discarding_revisions=True
        )
        self.assertEqual(self.mock_add_message.call_count, 0)

    def test_fetch_update_success_with_warnings(self):
        self.mock_client.fetch.return_value = {
            b"HEAD": self.default_lastest_sha.encode()}
        self.mock_validate_course_content.return_value = (
            ValidationWarning(LOCATION1, WARNING1),
            ValidationWarning(LOCATION2, WARNING2),
        )

        self.check_command_message_result(
            command="fetch_update",
            add_message_expected_call_count=3,
            expected_add_message_literals=[
                FETCHED_LITERAL,
                WARNINGS_LITERAL, LOCATION1, WARNING1, LOCATION2, WARNING2,
            ],
            not_expected_add_message_literals=[PREVIEW_END_LITERAL])
        self.assertIsNone(self.participation.preview_git_commit_sha)
        self.assertEqual(
            self.course.active_git_commit_sha, self.default_lastest_sha)

    def test_fetch_update_success_with_warnings_previewing(self):
        self.mock_client.fetch.return_value = {
            b"HEAD": self.default_lastest_sha.encode()}
        self.mock_validate_course_content.return_value = (
            ValidationWarning(LOCATION1, WARNING1),
            ValidationWarning(LOCATION2, WARNING2),
        )

        self.check_command_message_result(
            command="fetch_update",
            is_previewing=True,
            add_message_expected_call_count=4,
            expected_add_message_literals=[
                FETCHED_LITERAL,
                WARNINGS_LITERAL, LOCATION1, WARNING1, LOCATION2, WARNING2,
                PREVIEW_END_LITERAL
            ])
        self.assertIsNone(self.participation.preview_git_commit_sha)
        self.assertEqual(
            self.course.active_git_commit_sha, self.default_lastest_sha)

    def test_fetch_update_with_validation_error(self):
        from course.validation import ValidationError
        my_validation_error_msg = "my validation error."
        self.mock_validate_course_content.side_effect = (
            ValidationError(my_validation_error_msg))

        self.check_command_message_result(
            command="fetch_update",
            add_message_expected_call_count=2,
            expected_add_message_literals=[
                FETCHED_LITERAL,
                VALIDATE_FAILURE_LITERAL, my_validation_error_msg],
            not_expected_add_message_literals=[PREVIEW_END_LITERAL])
        self.assertIsNone(self.participation.preview_git_commit_sha)
        self.assertEqual(self.course.active_git_commit_sha, self.default_old_sha)

    def test_update_with_validation_error(self):
        from course.validation import ValidationError
        my_validation_error_msg = "my validation error."
        self.mock_validate_course_content.side_effect = (
            ValidationError(my_validation_error_msg))

        self.check_command_message_result(
            command="update",
            add_message_expected_call_count=1,
            expected_add_message_literals=[
                VALIDATE_FAILURE_LITERAL, my_validation_error_msg],
            not_expected_add_message_literals=[PREVIEW_END_LITERAL]
        )
        self.assertIsNone(self.participation.preview_git_commit_sha)
        self.assertEqual(self.course.active_git_commit_sha, self.default_old_sha)

    def test_update_with_validation_error_previewing(self):
        from course.validation import ValidationError
        my_validation_error_msg = "my validation error."
        self.mock_validate_course_content.side_effect = (
            ValidationError(my_validation_error_msg))

        self.check_command_message_result(
            command="update",
            add_message_expected_call_count=1,
            is_previewing=True,
            expected_add_message_literals=[
                VALIDATE_FAILURE_LITERAL, my_validation_error_msg],
            not_expected_add_message_literals=[PREVIEW_END_LITERAL]
        )
        self.assertIsNotNone(self.participation.preview_git_commit_sha)
        self.assertEqual(
            self.participation.preview_git_commit_sha, self.default_preview_sha)
        self.assertEqual(self.course.active_git_commit_sha, self.default_old_sha)

    @unittest.skipIf(six.PY2, "PY2 doesn't support subTest")
    def test_fetch_not_may_update(self):
        self.mock_client.fetch.return_value = {
            b"HEAD": self.default_lastest_sha.encode()}

        command_tup = (
            ("fetch", 1, [FETCHED_LITERAL], [UPDATE_APPLIED_LITERAL],
             self.default_old_sha),
            ("fetch_update", 2, [FETCHED_LITERAL, VALIDATE_SUCCESS_LITERAL],
             [UPDATE_APPLIED_LITERAL],
             self.default_old_sha)
        )

        for (command, add_message_call_count, expected, not_expected,
             expected_course_sha) in command_tup:
            with self.subTest(command=command):
                self.check_command_message_result(
                    command=command,
                    add_message_expected_call_count=add_message_call_count,
                    expected_add_message_literals=expected,
                    not_expected_add_message_literals=not_expected,
                    may_update=False
                )

                self.mock_add_message.reset_mock()

                self.check_command_message_result(
                    command=command,
                    add_message_expected_call_count=add_message_call_count,
                    expected_add_message_literals=expected,
                    not_expected_add_message_literals=not_expected,
                    may_update=False
                )

                self.assertEqual(
                    self.course.active_git_commit_sha, expected_course_sha)

                self.mock_add_message.reset_mock()


class VersioningRepoMixin(object):
    @classmethod
    def setUpTestData(cls):  # noqa
        super(VersioningRepoMixin, cls).setUpTestData()
        cls.rf = RequestFactory()
        request = cls.rf.get(cls.get_update_course_url())
        request.user = cls.instructor_participation.user

        from course.utils import CoursePageContext
        pctx = CoursePageContext(request, cls.course.identifier)
        cls.repo = pctx.repo


class GitUpdateFormTest(VersioningRepoMixin, SingleCourseTestMixin, TestCase):

    # Todo: test inner format_commit
    def test_may_not_update(self):
        form = versioning.GitUpdateForm(
            may_update=False, previewing=True, repo=self.repo)
        from crispy_forms.layout import Submit
        submit_input_names = [
            input.name for input in form.helper.inputs
            if isinstance(input, Submit)
        ]

        self.assertNotIn("fetch_update", submit_input_names)
        self.assertNotIn("update", submit_input_names)

    def test_may_update(self):
        form = versioning.GitUpdateForm(
            may_update=True, previewing=True, repo=self.repo)
        from crispy_forms.layout import Submit
        submit_input_names = [
            input.name for input in form.helper.inputs
            if isinstance(input, Submit)
        ]

        self.assertIn("fetch_update", submit_input_names)
        self.assertIn("update", submit_input_names)

    def test_not_previewing(self):
        form = versioning.GitUpdateForm(
            may_update=True, previewing=False, repo=self.repo)
        from crispy_forms.layout import Submit
        submit_input_names = [
            input.name for input in form.helper.inputs
            if isinstance(input, Submit)
        ]

        self.assertNotIn("end_preview", submit_input_names)