Skip to content
validation.py 54 KiB
Newer Older
    if permission not in dict(FLOW_PERMISSION_CHOICES):
                string_concat("%(location)s: ",
                    _("invalid flow permission '%(permission)s'"))
                % {"location": location, "permission": permission})
def validate_flow_desc(vctx, location, flow_desc):
    validate_struct(
        vctx,
        location,
        flow_desc,
        required_attrs=[
            ("title", str),
            ("description", "markup"),
        ],
        allowed_attrs=[
            ("completion_text", "markup"),
            ("rules", Struct),
            ("groups", list),
            ("pages", list),
            ("notify_on_submit", list),
            ("external_resources", list),
            # deprecated (moved to grading rule)
            ("max_points", (int, float)),
            ("max_points_enforced_cap", (int, float)),
            ("bonus_points", (int, float)),
        ],
    )
    if hasattr(flow_desc, "rules"):
        validate_flow_rules(vctx, location, flow_desc.rules)
    # {{{ check for presence of 'groups' or 'pages'

    if (
            (not hasattr(flow_desc, "groups") and not hasattr(flow_desc, "pages"))
Andreas Klöckner's avatar
Andreas Klöckner committed
            or (hasattr(flow_desc, "groups") and hasattr(flow_desc, "pages"))):
        raise ValidationError(
                string_concat("%(location)s: ",
                    _("must have either 'groups' or 'pages'"))
                % {"location": location})

    # }}}

    if hasattr(flow_desc, "pages"):
        from course.content import normalize_flow_desc
        flow_desc = normalize_flow_desc(flow_desc)

        assert not hasattr(flow_desc, "pages")
        assert hasattr(flow_desc, "groups")

    for i, grp in enumerate(flow_desc.groups):
        validate_flow_group(vctx, "%s, group %d ('%s')"
                % (location, i+1, getattr(grp, "id", "<unknown id>")),
                grp)

    # {{{ check for non-emptiness

    flow_has_page = False
    for i, grp in enumerate(flow_desc.groups):
        group_has_page = False

        for _page in grp.pages:
            group_has_page = flow_has_page = True
            break

        if not group_has_page:
                    string_concat(
                        "%(location)s, ",
                        _("group %(group_index)d ('%(group_id)s'): "
                        "location": location,
                        "group_index": i+1,
                        "group_id": grp.id})

    if not flow_has_page:
        raise ValidationError(_("%s: no pages found")
                % location)

    # }}}

    # {{{ check group id uniqueness

    group_ids = set()

    for grp in flow_desc.groups:
        if grp.id in group_ids:
                    string_concat("%(location)s: ",
                        _("group id '%(group_id)s' not unique"))
                    % {"location": location, "group_id": grp.id})
    validate_markup(vctx, location, flow_desc.description)
    if hasattr(flow_desc, "completion_text"):
        validate_markup(vctx, location, flow_desc.completion_text)
Andreas Klöckner's avatar
Andreas Klöckner committed
    if hasattr(flow_desc, "notify_on_submit"):
        for i, item in enumerate(flow_desc.notify_on_submit):
            if not isinstance(item, str):
Andreas Klöckner's avatar
Andreas Klöckner committed
                raise ValidationError(
                        string_concat(
                            "%s, ",
                            _("notify_on_submit: item %d is not a string"))
Andreas Klöckner's avatar
Andreas Klöckner committed
                        % (location, i+1))

    for attr in ["max_points", "max_points_enforced_cap", "bonus_points"]:
        if hasattr(flow_desc, attr):
            vctx.add_warning(location,
                    _("Attribute '%s' is deprecated as part of a flow. "
                    "Specify it as part of a grading rule instead.")
                    % attr)

Andreas Klöckner's avatar
Andreas Klöckner committed
# {{{ calendar validation

def validate_calendar_desc_struct(vctx, location, events_desc):
Andreas Klöckner's avatar
Andreas Klöckner committed
    validate_struct(
Andreas Klöckner's avatar
Andreas Klöckner committed
            location,
            events_desc,
            required_attrs=[
                ],
            allowed_attrs=[
                ("event_kinds", Struct),
                ("events", Struct),
                ]
            )

    if hasattr(events_desc, "event_kinds"):
        for event_kind_name in events_desc.event_kinds._field_names:
            event_kind = getattr(events_desc.event_kinds, event_kind_name)
            validate_struct(
                    vctx,
                    f"{location}, event kind '{event_kind_name}'",
                    event_kind,
                    required_attrs=[
                        ],
                    allowed_attrs=[
                        ("color", str),
                        ("title", str),
                        ]
                    )

    if hasattr(events_desc, "events"):
        for event_name in events_desc.events._field_names:
            event_desc = getattr(events_desc.events, event_name)

            validate_struct(
                    vctx,
                    f"{location}, event '{event_name}'",
                    event_desc,
                    required_attrs=[
                        ],
                    allowed_attrs=[
                        ("color", str),
                        ("title", str),
                        ("description", "markup"),
                        ("show_description_from", datespec_types),
                        ("show_description_until", datespec_types),

            if hasattr(event_desc, "show_description_from"):
                vctx.encounter_datespec(location, event_desc.show_description_from)

            if hasattr(event_desc, "show_description_until"):
                vctx.encounter_datespec(location, event_desc.show_description_until)

def get_yaml_from_repo_safely(repo, full_name, commit_sha):
    from course.content import get_yaml_from_repo
        return get_yaml_from_repo(
                repo=repo, full_name=full_name, commit_sha=commit_sha,
                cached=False)
Andreas Klöckner's avatar
Andreas Klöckner committed
    except Exception:
        from traceback import print_exc
        print_exc()

        tp, e, _ = sys.exc_info()
                f"{full_name}: {tp.__name__}: {e!s}")
def check_attributes_yml(
        vctx: ValidationContext,
        repo: Repo_ish,
        path: str, tree: Any,
        access_kinds: list[str]) -> None:
    """
    This function reads the .attributes.yml file and checks
    that each item for each header is a string

    Example::

        # this validates
        unenrolled:
            - test1.pdf
        student:
            - test2.pdf

        # this does not validate
        unenrolled:
            - test1.pdf
        student:
            - test2.pdf
            - 42
    from course.content import get_true_repo_and_path
    true_repo, path = get_true_repo_and_path(repo, path)

    # {{{ analyze attributes file
        _dummy, attr_blob_sha = tree[ATTRIBUTES_FILENAME.encode()]
    except KeyError:
        # no .attributes.yml here
        pass
    except ValueError:
        # the path root only contains a directory
Andreas Klöckner's avatar
Andreas Klöckner committed
        from yaml import safe_load as load_yaml
Andreas Klöckner's avatar
Andreas Klöckner committed
        from relate.utils import dict_to_struct

        yaml_data = load_yaml(true_repo[attr_blob_sha].data)  # type: ignore
        att_yml = dict_to_struct(yaml_data)
        if path:
            loc = path + "/" + ATTRIBUTES_FILENAME
        else:
            loc = ATTRIBUTES_FILENAME
Andreas Klöckner's avatar
Andreas Klöckner committed
                        allowed_attrs=[(role, list) for role in access_kinds])
        if hasattr(att_yml, "public"):
            vctx.add_warning(loc,
                    _("Access class 'public' is deprecated. Use 'unenrolled' "
                        "instead."))

        if hasattr(att_yml, "public") and hasattr(att_yml, "unenrolled"):
            raise ValidationError(
                _("%s: access classes 'public' and 'unenrolled' may not "
                    "exist simultaneously.")
                % (loc))

Andreas Klöckner's avatar
Andreas Klöckner committed
        for access_kind in access_kinds:
Andreas Klöckner's avatar
Andreas Klöckner committed
            if hasattr(att_yml, access_kind):
                for i, ln in enumerate(getattr(att_yml, access_kind)):
                    if not isinstance(ln, str):
Andreas Klöckner's avatar
Andreas Klöckner committed
                        raise ValidationError(
                            "%s: entry %d in '%s' is not a string"
                            % (loc, i+1, access_kind))
    # }}}

    # {{{ analyze gitignore

        _dummy, gitignore_sha = tree[b".gitignore"]
    except KeyError:
        # no .gitignore here
        pass
    except ValueError:
        # the path root only contains a directory
        pass
    else:
        gitignore_lines = true_repo[gitignore_sha].data.decode("utf-8").split("\n")

    # }}}

    from fnmatch import fnmatchcase

    for entry in tree.items():
        entry_name = entry.path.decode("utf-8")
        if any(fnmatchcase(entry_name, line) for line in gitignore_lines):
            continue

        if path:
            subpath = path+"/"+entry_name
        else:
            subpath = entry_name

        if stat.S_ISDIR(entry.mode):
            _dummy, blob_sha = tree[entry.path]
            subtree = true_repo[blob_sha]
            check_attributes_yml(vctx, true_repo, subpath, subtree, access_kinds)
# {{{ check whether flow grade identifiers were changed in sketchy ways

def check_grade_identifier_link(
        vctx, location, course, flow_id, flow_grade_identifier):

    from course.models import GradingOpportunity
    for bad_gopp in (
            GradingOpportunity.objects
            .filter(
                course=course,
                identifier=flow_grade_identifier)
            .exclude(flow_id=flow_id)):
        # 0 or 1 trips through this loop because of uniqueness

        raise ValidationError(
                _(
                    "{location}: existing grading opportunity with identifier "
                    "'{grade_identifier}' refers to flow '{other_flow_id}', however "
                    "flow code in this flow ('{new_flow_id}') specifies the same "
                    "grade identifier. "
                    "(Have you renamed the flow? If so, edit the grading "
                    "opportunity to match.)")
                .format(
                    location=location,
                    grade_identifier=flow_grade_identifier,
                    other_flow_id=bad_gopp.flow_id,
                    new_flow_id=flow_id,
                    new_grade_identifier=flow_grade_identifier))

# }}}


# {{{ check whether page types were changed

def check_for_page_type_changes(vctx, location, course, flow_id, flow_desc):
    from course.content import normalize_flow_desc
    n_flow_desc = normalize_flow_desc(flow_desc)

    from course.models import FlowPageData
Dong Zhuang's avatar
Dong Zhuang committed
    for grp in n_flow_desc.groups:  # pragma: no branch
        for page_desc in grp.pages:  # pragma: no branch
            fpd_with_mismatched_page_types = list(
                    FlowPageData.objects
                    .filter(
                        flow_session__course=course,
                        flow_session__flow_id=flow_id,
                        group_id=grp.id,
                        page_id=page_desc.id)
                    .exclude(page_type=None)
                    .exclude(page_type=page_desc.type)
                    [0:1])

            if fpd_with_mismatched_page_types:
                mismatched_fpd, = fpd_with_mismatched_page_types
                raise ValidationError(
                        _("%(loc)s, group '%(group)s', page '%(page)s': "
                            "page type ('%(type_new)s') differs from "
                            "type used in database ('%(type_old)s'). "
                            "You must change the question ID if you change the "
                            "question type.")
                        % {"loc": location, "group": grp.id,
                            "page": page_desc.id,
                            "type_new": page_desc.type,
                            "type_old": mismatched_fpd.page_type})

# }}}


def validate_flow_id(vctx: ValidationContext, location: str, flow_id: str) -> None:
Dong Zhuang's avatar
Dong Zhuang committed

    from course.constants import FLOW_ID_REGEX
    match = re.match("^" + FLOW_ID_REGEX + "$", flow_id)
    if match is None:
        raise ValidationError(
            string_concat("%s: ",
                          _("invalid flow name. "
                            "Flow names may only contain (roman) "
                            "letters, numbers, "
                            "dashes and underscores."))
            % location)


def validate_static_page_name(
        vctx: ValidationContext, location: str, page_name: str) -> None:
Dong Zhuang's avatar
Dong Zhuang committed
    from course.constants import STATICPAGE_PATH_REGEX
    match = re.match("^" + STATICPAGE_PATH_REGEX + "$", page_name)
    if match is None:
        raise ValidationError(
            string_concat("%s: ",
                          _(
                              "invalid page name. "
                              "Page names may only contain "
                              "alphanumeric characters (any language) "
                              "and hyphens."
                          ))
            % location)


Andreas Klöckner's avatar
Andreas Klöckner committed
def validate_course_content(repo, course_file, events_file,
    vctx = ValidationContext(
            repo=repo,
            commit_sha=validate_sha,
    course_desc = get_yaml_from_repo_safely(repo, course_file,
            commit_sha=validate_sha)

    validate_staticpage_desc(vctx, course_file, course_desc)
Andreas Klöckner's avatar
Andreas Klöckner committed
    try:
        from course.content import get_yaml_from_repo
        events_desc = get_yaml_from_repo(repo, events_file,
                commit_sha=validate_sha, cached=False)
Andreas Klöckner's avatar
Andreas Klöckner committed
    except ObjectDoesNotExist:
                    _("Your course repository does not have an events "
                        "file named '%s'.")
                    % events_file)
        else:
            # That's OK--no calendar info.
            pass
Andreas Klöckner's avatar
Andreas Klöckner committed
    else:
        validate_calendar_desc_struct(vctx, events_file, events_desc)

Andreas Klöckner's avatar
Andreas Klöckner committed
    if vctx.course is not None:
        from course.models import (
            ParticipationPermission,
            ParticipationRolePermission,
Andreas Klöckner's avatar
Andreas Klöckner committed
        )
Andreas Klöckner's avatar
Andreas Klöckner committed
        access_kinds = frozenset(
                ParticipationPermission.objects
                .filter(
                    participation__course=vctx.course,
                    permission=pperm.access_files_for,
                    )
                .values_list("argument", flat=True)) | frozenset(
                        ParticipationRolePermission.objects
                        .filter(
                            role__course=vctx.course,
Andreas Klöckner's avatar
Andreas Klöckner committed
                            permission=pperm.access_files_for,
                            )
                        .values_list("argument", flat=True))

        access_kinds = frozenset(k for k in access_kinds if k is not None)

Andreas Klöckner's avatar
Andreas Klöckner committed
    else:
        access_kinds = DEFAULT_ACCESS_KINDS
    from course.content import get_repo_tree

    check_attributes_yml(
Andreas Klöckner's avatar
Andreas Klöckner committed
            vctx, repo, "",
            get_repo_tree(repo, "", validate_sha),
Andreas Klöckner's avatar
Andreas Klöckner committed
            access_kinds)
        get_repo_tree(repo, "media", validate_sha)
    except ObjectDoesNotExist:
        # That's great--no media directory.
        pass
    else:
        vctx.add_warning(
                    "Your course repository has a 'media/' directory. "
                    "Linking to media files using 'media:' is discouraged. "
                    "Use the 'repo:' and 'repocur:' linkng schemes instead."))

        flows_tree = get_repo_tree(repo, "flows", validate_sha)
    except ObjectDoesNotExist:
        # That's OK--no flows yet.
        pass
    else:
        used_grade_identifiers = set()

        for entry in flows_tree.items():
            entry_path = entry.path.decode("utf-8")
            if not entry_path.endswith(".yml"):
            flow_id = entry_path[:-4]
Dong Zhuang's avatar
Dong Zhuang committed
            location = entry_path
            validate_flow_id(vctx, location, flow_id)
            location = f"flows/{entry_path}"
            flow_desc = get_yaml_from_repo_safely(repo, location,
                    commit_sha=validate_sha)
            validate_flow_desc(vctx, location, flow_desc)
            # {{{ check grade_identifier

            if hasattr(flow_desc, "rules"):
                flow_grade_identifier = getattr(
                        flow_desc.rules, "grade_identifier", None)

            if (
                    flow_grade_identifier is not None
                    and {flow_grade_identifier} & used_grade_identifiers):
                raise ValidationError(
                                      _("flow uses the same grade_identifier "
                                        "as another flow"))
                        % location)

            used_grade_identifiers.add(flow_grade_identifier)

            if (course is not None
                    and flow_grade_identifier is not None):
                check_grade_identifier_link(
                        vctx, location, course, flow_id, flow_grade_identifier)

            if course is not None:
                check_for_page_type_changes(
                        vctx, location, course, flow_id, flow_desc)

    from course.content import get_repo_blob

    # {{{ static pages

    try:
        pages_tree = get_repo_blob(repo, "staticpages", validate_sha)
    except ObjectDoesNotExist:
        # That's OK--no flows yet.
        pass
    else:
        for entry in pages_tree.items():
            entry_path = entry.path.decode("utf-8")
            if not entry_path.endswith(".yml"):
                continue

            page_name = entry_path[:-4]
Dong Zhuang's avatar
Dong Zhuang committed
            location = entry_path
            validate_static_page_name(vctx, location, page_name)
            location = f"staticpages/{entry_path}"
            page_desc = get_yaml_from_repo_safely(repo, location,
                    commit_sha=validate_sha)
            validate_staticpage_desc(vctx, location, page_desc)
    return vctx.warnings

# {{{ validation script support

class FileSystemFakeRepo:  # pragma: no cover
    def __init__(self, root):
        self.root = root
        assert isinstance(self.root, bytes)
    def controldir(self):
        return self.root

    def __getitem__(self, sha):
        return sha

    def __str__(self):
        return f"<FAKEREPO:{self.root}>"
    def decode(self):
        return self

    @property
    def tree(self):
        return FileSystemFakeRepoTree(self.root)


class FileSystemFakeRepoTreeEntry:  # pragma: no cover
    def __init__(self, path: bytes, mode: int) -> None:
        self.path = path
        self.mode = mode
class FileSystemFakeRepoTree:  # pragma: no cover
    def __init__(self, root):
        self.root = root
        assert isinstance(self.root, bytes)

    def __getitem__(self, name):
Andreas Klöckner's avatar
Andreas Klöckner committed
        if not name:
            raise KeyError("<empty filename>")

Andreas Klöckner's avatar
Andreas Klöckner committed
        from os.path import exists, join
        name = join(self.root, name)

        if not exists(name):
            raise KeyError(name)
        from os import stat
        from stat import S_ISDIR
        stat_result = stat(name)
        # returns mode, "sha"
        if S_ISDIR(stat_result.st_mode):
            return stat_result.st_mode, FileSystemFakeRepoTree(name)
            return stat_result.st_mode, FileSystemFakeRepoFile(name)
    def items(self) -> list[FileSystemFakeRepoTreeEntry]:
        return [
                FileSystemFakeRepoTreeEntry(
                    path=n,
                    mode=os.stat(os.path.join(self.root, n)).st_mode)
                for n in os.listdir(self.root)]
class FileSystemFakeRepoFile:  # pragma: no cover
    def __init__(self, name):
        self.name = name

    @property
    def data(self):
        with open(self.name, "rb") as inf:
            return inf.read()


Blob_ish = dulwich.objects.Blob | FileSystemFakeRepoFile
Tree_ish = dulwich.objects.Tree | FileSystemFakeRepoTree
def validate_course_on_filesystem(
        root, course_file, events_file):  # pragma: no cover
    fake_repo = FileSystemFakeRepo(root.encode("utf-8"))
    warnings = validate_course_content(
            course_file, events_file,
        for w in warnings:
            print("***", w.location, w.text)

# vim: foldmethod=marker