Skip to content
validation.py 54.3 KiB
Newer Older
                "replace by 'see_answer_after_submission'"))
    if permission not in dict(FLOW_PERMISSION_CHOICES):
                string_concat("%(location)s: ",
                    _("invalid flow permission '%(permission)s'"))
                % {'location': location, 'permission': permission})
def validate_flow_desc(vctx, location, flow_desc):
    validate_struct(
            location,
            flow_desc,
            required_attrs=[
                ("title", str),
                ("description", "markup"),
                ],
            allowed_attrs=[
                ("completion_text", "markup"),
                ("groups", list),
                ("pages", list),
Andreas Klöckner's avatar
Andreas Klöckner committed
                ("notify_on_submit", list),

                # deprecated (moved to grading rule)
                ("max_points", (int, float)),
                ("max_points_enforced_cap", (int, float)),
                ("bonus_points", (int, float)),
    if hasattr(flow_desc, "rules"):
        validate_flow_rules(vctx, location, flow_desc.rules)
    # {{{ check for presence of 'groups' or 'pages'

    if (
            (not hasattr(flow_desc, "groups") and not hasattr(flow_desc, "pages"))
            or
            (hasattr(flow_desc, "groups") and hasattr(flow_desc, "pages"))):
        raise ValidationError(
                string_concat("%(location)s: ",
                    _("must have either 'groups' or 'pages'"))
                % {'location': location})

    # }}}

    if hasattr(flow_desc, "pages"):
        from course.content import normalize_flow_desc
        flow_desc = normalize_flow_desc(flow_desc)

        assert not hasattr(flow_desc, "pages")
        assert hasattr(flow_desc, "groups")

    # {{{ check for non-emptiness

    flow_has_page = False
    for i, grp in enumerate(flow_desc.groups):
        group_has_page = False

        if not isinstance(grp.pages, list):
            raise ValidationError(
                    string_concat(
                        "%(location)s, ",
                        _("group %(group_index)d ('%(group_id)s'): "
                            "'pages' is not a list"))
                    % {
                        'location': location,
                        'group_index': i+1,
                        'group_id': grp.id})

        for page in grp.pages:
            group_has_page = flow_has_page = True
            break

        if not group_has_page:
                    string_concat(
                        "%(location)s, ",
                        _("group %(group_index)d ('%(group_id)s'): "
                    % {
                        'location': location,
                        'group_index': i+1,
                        'group_id': grp.id})

    if not flow_has_page:
        raise ValidationError(_("%s: no pages found")
                % location)

    # }}}

    # {{{ check group id uniqueness

    group_ids = set()

    for grp in flow_desc.groups:
        if grp.id in group_ids:
                    string_concat("%(location)s: ",
                        _("group id '%(group_id)s' not unique"))
                    % {'location': location, 'group_id': grp.id})

        group_ids.add(grp.id)

    # }}}

    for i, grp in enumerate(flow_desc.groups):
        validate_flow_group(vctx, "%s, group %d ('%s')"
                % (location, i+1, grp.id),
                grp)

    validate_markup(vctx, location, flow_desc.description)
    if hasattr(flow_desc, "completion_text"):
        validate_markup(vctx, location, flow_desc.completion_text)
Andreas Klöckner's avatar
Andreas Klöckner committed
    if hasattr(flow_desc, "notify_on_submit"):
        for i, item in enumerate(flow_desc.notify_on_submit):
            if not isinstance(item, six.string_types):
                raise ValidationError(
                        string_concat(
                            "%s, ",
                            _("notify_on_submit: item %d is not a string"))
Andreas Klöckner's avatar
Andreas Klöckner committed
                        % (location, i+1))

    for attr in ["max_points", "max_points_enforced_cap", "bonus_points"]:
        if hasattr(flow_desc, attr):
            vctx.add_warning(location,
                    _("Attribute '%s' is deprecated as part of a flow. "
                    "Specify it as part of a grading rule instead.")
                    % attr)

Andreas Klöckner's avatar
Andreas Klöckner committed
# {{{ calendar validation

def validate_calendar_desc_struct(vctx, location, events_desc):
Andreas Klöckner's avatar
Andreas Klöckner committed
    validate_struct(
Andreas Klöckner's avatar
Andreas Klöckner committed
            location,
            events_desc,
            required_attrs=[
                ],
            allowed_attrs=[
                ("event_kinds", Struct),
                ("events", Struct),
                ]
            )

    if hasattr(events_desc, "event_kinds"):
        for event_kind_name in events_desc.event_kinds._field_names:
            event_kind = getattr(events_desc.event_kinds, event_kind_name)
            validate_struct(
                    vctx,
                    "%s, event kind '%s'" % (location, event_kind_name),
                    event_kind,
                    required_attrs=[
                        ],
                    allowed_attrs=[
                        ("color", str),
                        ("title", str),
                        ]
                    )

    if hasattr(events_desc, "events"):
        for event_name in events_desc.events._field_names:
            event_desc = getattr(events_desc.events, event_name)

            validate_struct(
                    vctx,
                    "%s, event '%s'" % (location, event_name),
                    event_desc,
                    required_attrs=[
                        ],
                    allowed_attrs=[
                        ("color", str),
                        ("title", str),
                        ("description", "markup"),
                        ("show_description_from", datespec_types),
                        ("show_description_until", datespec_types),

            if hasattr(event_desc, "show_description_from"):
                vctx.encounter_datespec(location, event_desc.show_description_from)

            if hasattr(event_desc, "show_description_until"):
                vctx.encounter_datespec(location, event_desc.show_description_until)

def get_yaml_from_repo_safely(repo, full_name, commit_sha):
    from course.content import get_yaml_from_repo
        return get_yaml_from_repo(
                repo=repo, full_name=full_name, commit_sha=commit_sha,
                cached=False)
Andreas Klöckner's avatar
Andreas Klöckner committed
    except Exception:
        from traceback import print_exc
        print_exc()

        tp, e, _ = sys.exc_info()
        raise ValidationError(
                "%(fullname)s: %(err_type)s: %(err_str)s" % {
                    'fullname': full_name,
Dong Zhuang's avatar
Dong Zhuang committed
                    "err_type": tp.__name__,
                    "err_str": six.text_type(e)})
Andreas Klöckner's avatar
Andreas Klöckner committed
def check_attributes_yml(vctx, repo, path, tree, access_kinds):
    # type: (ValidationContext, Repo_ish, Text, Any, List[Text]) -> None
    """
    This function reads the .attributes.yml file and checks
    that each item for each header is a string

    Example::

        # this validates
        unenrolled:
            - test1.pdf
        student:
            - test2.pdf

        # this does not validate
        unenrolled:
            - test1.pdf
        student:
            - test2.pdf
            - 42
    from course.content import get_true_repo_and_path
    true_repo, path = get_true_repo_and_path(repo, path)

    # {{{ analyze attributes file
        dummy, attr_blob_sha = tree[ATTRIBUTES_FILENAME.encode()]
    except KeyError:
        # no .attributes.yml here
        pass
    else:
        from relate.utils import dict_to_struct
Andreas Klöckner's avatar
Andreas Klöckner committed
        from yaml import safe_load as load_yaml
        yaml_data = load_yaml(true_repo[attr_blob_sha].data)  # type: ignore
        att_yml = dict_to_struct(yaml_data)
        if path:
            loc = path + "/" + ATTRIBUTES_FILENAME
        else:
            loc = ATTRIBUTES_FILENAME
Andreas Klöckner's avatar
Andreas Klöckner committed
                        allowed_attrs=[(role, list) for role in access_kinds])
        if hasattr(att_yml, "public"):
            vctx.add_warning(loc,
                    _("Access class 'public' is deprecated. Use 'unenrolled' "
                        "instead."))

        if hasattr(att_yml, "public") and hasattr(att_yml, "unenrolled"):
            raise ValidationError(
                _("%s: access classes 'public' and 'unenrolled' may not "
                    "exist simultaneously.")
                % (loc))

Andreas Klöckner's avatar
Andreas Klöckner committed
        for access_kind in access_kinds:
Andreas Klöckner's avatar
Andreas Klöckner committed
            if hasattr(att_yml, access_kind):
                for i, l in enumerate(getattr(att_yml, access_kind)):
                    if not isinstance(l, six.string_types):
Andreas Klöckner's avatar
Andreas Klöckner committed
                        raise ValidationError(
                            "%s: entry %d in '%s' is not a string"
                            % (loc, i+1, access_kind))
    # }}}

    # {{{ analyze gitignore

    gitignore_lines = []  # type: List[Text]

    try:
        dummy, gitignore_sha = tree[b".gitignore"]
    except KeyError:
        # no .attributes.yml here
        pass
    else:
        gitignore_lines = true_repo[gitignore_sha].data.decode("utf-8").split("\n")

    # }}}

    from fnmatch import fnmatchcase

    for entry in tree.items():
        entry_name = entry.path.decode("utf-8")
        if any(fnmatchcase(entry_name, line) for line in gitignore_lines):
            continue

        if path:
            subpath = path+"/"+entry_name
        else:
            subpath = entry_name

        if stat.S_ISDIR(entry.mode):
            dummy, blob_sha = tree[entry.path]
            subtree = true_repo[blob_sha]
            check_attributes_yml(vctx, true_repo, subpath, subtree, access_kinds)
# {{{ check whether flow grade identifiers were changed in sketchy ways

def check_grade_identifier_link(
        vctx, location, course, flow_id, flow_grade_identifier):

    from course.models import GradingOpportunity
    for bad_gopp in (
            GradingOpportunity.objects
            .filter(
                course=course,
                identifier=flow_grade_identifier)
            .exclude(flow_id=flow_id)):
        # 0 or 1 trips through this loop because of uniqueness

        raise ValidationError(
                _(
                    "{location}: existing grading opportunity with identifier "
                    "'{grade_identifier}' refers to flow '{other_flow_id}', however "
                    "flow code in this flow ('{new_flow_id}') specifies the same "
                    "grade identifier. "
                    "(Have you renamed the flow? If so, edit the grading "
                    "opportunity to match.)")
                .format(
                    location=location,
                    grade_identifier=flow_grade_identifier,
                    other_flow_id=bad_gopp.flow_id,
                    new_flow_id=flow_id,
                    new_grade_identifier=flow_grade_identifier))

# }}}


# {{{ check whether page types were changed

def check_for_page_type_changes(vctx, location, course, flow_id, flow_desc):
    from course.content import normalize_flow_desc
    n_flow_desc = normalize_flow_desc(flow_desc)

    from course.models import FlowPageData
    for grp in n_flow_desc.groups:
        for page_desc in grp.pages:
            fpd_with_mismatched_page_types = list(
                    FlowPageData.objects
                    .filter(
                        flow_session__course=course,
                        flow_session__flow_id=flow_id,
                        group_id=grp.id,
                        page_id=page_desc.id)
                    .exclude(page_type=None)
                    .exclude(page_type=page_desc.type)
                    [0:1])

            if fpd_with_mismatched_page_types:
                mismatched_fpd, = fpd_with_mismatched_page_types
                raise ValidationError(
                        _("%(loc)s, group '%(group)s', page '%(page)s': "
                            "page type ('%(type_new)s') differs from "
                            "type used in database ('%(type_old)s')")
                        % {"loc": location, "group": grp.id,
                            "page": page_desc.id,
                            "type_new": page_desc.type,
                            "type_old": mismatched_fpd.page_type})

# }}}


Andreas Klöckner's avatar
Andreas Klöckner committed
def validate_course_content(repo, course_file, events_file,
    vctx = ValidationContext(
            repo=repo,
            commit_sha=validate_sha,
    course_desc = get_yaml_from_repo_safely(repo, course_file,
            commit_sha=validate_sha)

    validate_staticpage_desc(vctx, course_file, course_desc)
Andreas Klöckner's avatar
Andreas Klöckner committed
    try:
        from course.content import get_yaml_from_repo
        events_desc = get_yaml_from_repo(repo, events_file,
                commit_sha=validate_sha, cached=False)
Andreas Klöckner's avatar
Andreas Klöckner committed
    except ObjectDoesNotExist:
                    _("Your course repository does not have an events "
                        "file named '%s'.")
                    % events_file)
        else:
            # That's OK--no calendar info.
            pass
Andreas Klöckner's avatar
Andreas Klöckner committed
    else:
        validate_calendar_desc_struct(vctx, events_file, events_desc)

Andreas Klöckner's avatar
Andreas Klöckner committed
    if vctx.course is not None:
        from course.models import (
                ParticipationPermission,
                ParticipationRolePermission)
        access_kinds = frozenset(
                ParticipationPermission.objects
                .filter(
                    participation__course=vctx.course,
                    permission=pperm.access_files_for,
                    )
                .values_list("argument", flat=True)) | frozenset(
                        ParticipationRolePermission.objects
                        .filter(
                            role__course=vctx.course,
Andreas Klöckner's avatar
Andreas Klöckner committed
                            permission=pperm.access_files_for,
                            )
                        .values_list("argument", flat=True))

        access_kinds = frozenset(k for k in access_kinds if k is not None)

Andreas Klöckner's avatar
Andreas Klöckner committed
    else:
        access_kinds = ["public", "in_exam", "student", "ta",
                     "unenrolled", "instructor"]

    check_attributes_yml(
Andreas Klöckner's avatar
Andreas Klöckner committed
            vctx, repo, "",
            get_repo_blob(repo, "", validate_sha),
            access_kinds)
    try:
        flows_tree = get_repo_blob(repo, "media", validate_sha)
    except ObjectDoesNotExist:
        # That's great--no media directory.
        pass
    else:
        vctx.add_warning(
                'media/', _(
                    "Your course repository has a 'media/' directory. "
                    "Linking to media files using 'media:' is discouraged. "
                    "Use the 'repo:' and 'repocur:' linkng schemes instead."))

    try:
        flows_tree = get_repo_blob(repo, "flows", validate_sha)
    except ObjectDoesNotExist:
        # That's OK--no flows yet.
        pass
    else:
        used_grade_identifiers = set()

        for entry in flows_tree.items():
            entry_path = entry.path.decode("utf-8")
            if not entry_path.endswith(".yml"):
            from course.constants import FLOW_ID_REGEX
            flow_id = entry_path[:-4]
            match = re.match("^"+FLOW_ID_REGEX+"$", flow_id)
            if match is None:
                raise ValidationError(
                        string_concat("%s: ",
                            _("invalid flow name. "
                                "Flow names may only contain (roman) "
                                "letters, numbers, "
                                "dashes and underscores."))
                        % entry_path)
            location = "flows/%s" % entry_path
            flow_desc = get_yaml_from_repo_safely(repo, location,
                    commit_sha=validate_sha)
            validate_flow_desc(vctx, location, flow_desc)
            # {{{ check grade_identifier

            if hasattr(flow_desc, "rules"):
                flow_grade_identifier = getattr(
                        flow_desc.rules, "grade_identifier", None)

            if (
                    flow_grade_identifier is not None
                    and
                    set([flow_grade_identifier]) & used_grade_identifiers):
                raise ValidationError(
                                      _("flow uses the same grade_identifier "
                                        "as another flow"))
                        % location)

            used_grade_identifiers.add(flow_grade_identifier)

            if (course is not None
                    and flow_grade_identifier is not None):
                check_grade_identifier_link(
                        vctx, location, course, flow_id, flow_grade_identifier)

            if course is not None:
                check_for_page_type_changes(
                        vctx, location, course, flow_id, flow_desc)

    # }}}

    # {{{ static pages

    try:
        pages_tree = get_repo_blob(repo, "staticpages", validate_sha)
    except ObjectDoesNotExist:
        # That's OK--no flows yet.
        pass
    else:
        for entry in pages_tree.items():
            entry_path = entry.path.decode("utf-8")
            if not entry_path.endswith(".yml"):
                continue

            from course.constants import STATICPAGE_PATH_REGEX
            page_name = entry_path[:-4]
            match = re.match("^"+STATICPAGE_PATH_REGEX+"$", page_name)
            if match is None:
                raise ValidationError(
                        string_concat("%s: ",
                            _(
                                "invalid page name. "
                                "Page names may only contain "
                                "alphanumeric characters (any language) "
                                "and hyphens."
                                ))
                        % entry_path)

            location = "staticpages/%s" % entry_path
            page_desc = get_yaml_from_repo_safely(repo, location,
                    commit_sha=validate_sha)
            validate_staticpage_desc(vctx, location, page_desc)
    return vctx.warnings

# {{{ validation script support

class FileSystemFakeRepo(object):
    def __init__(self, root):
        self.root = root
        assert isinstance(self.root, six.binary_type)
    def controldir(self):
        return self.root

    def __getitem__(self, sha):
        return sha

    def __str__(self):
        return "<FAKEREPO:%s>" % self.root

    def decode(self):
        return self

    @property
    def tree(self):
        return FileSystemFakeRepoTree(self.root)


class FileSystemFakeRepoTreeEntry(object):
    def __init__(self, path, mode):
        self.path = path
        self.mode = mode


class FileSystemFakeRepoTree(object):
    def __init__(self, root):
        self.root = root
        assert isinstance(self.root, six.binary_type)

    def __getitem__(self, name):
Andreas Klöckner's avatar
Andreas Klöckner committed
        if not name:
            raise KeyError("<empty filename>")

        from os.path import join, isdir, exists
        name = join(self.root, name)

        if not exists(name):
            raise KeyError(name)

        # returns mode, "sha"
        if isdir(name):
            return None, FileSystemFakeRepoTree(name)
        else:
            return None, FileSystemFakeRepoFile(name)

    def items(self):
        import os
        return [
                FileSystemFakeRepoTreeEntry(
                    path=n,
                    mode=os.stat(os.path.join(self.root, n)).st_mode)
                for n in os.listdir(self.root)]


class FileSystemFakeRepoFile(object):
    def __init__(self, name):
        self.name = name

    @property
    def data(self):
        with open(self.name, "rb") as inf:
            return inf.read()


def validate_course_on_filesystem(
        root, course_file, events_file):
    fake_repo = FileSystemFakeRepo(root.encode("utf-8"))
    warnings = validate_course_content(
            course_file, events_file,
        for w in warnings:
            print("***", w.location, w.text)

# vim: foldmethod=marker