Skip to content
content.py 60.2 KiB
Newer Older
def get_raw_yaml_from_repo(
        repo: Repo_ish, full_name: str, commit_sha: bytes) -> Any:
    """Return decoded YAML data structure from
    the given file in *repo* at *commit_sha*.

    :arg commit_sha: A byte string containing the commit hash
    from urllib.parse import quote_plus
        quote_plus(repo.controldir()), quote_plus(full_name), commit_sha.decode(),
        ))
    import django.core.cache as cache
Andreas Klöckner's avatar
Andreas Klöckner committed
    def_cache = cache.caches["default"]
    # Memcache is apparently limited to 250 characters.
    if len(cache_key) < 240:
        result = def_cache.get(cache_key)
Andreas Klöckner's avatar
Andreas Klöckner committed
    if result is not None:
        return result

    yaml_str = expand_yaml_macros(
                repo, commit_sha,
Andreas Klöckner's avatar
Andreas Klöckner committed
                get_repo_blob(repo, full_name, commit_sha,
                    allow_tree=False).data)

    result = load_yaml(yaml_str)  # type: ignore
Andreas Klöckner's avatar
Andreas Klöckner committed

    def_cache.add(cache_key, result, None)

    return result


Andreas Klöckner's avatar
Andreas Klöckner committed
LINE_HAS_INDENTING_TABS_RE = re.compile(r"^\s*\t\s*", re.MULTILINE)
def get_yaml_from_repo(
        repo: Repo_ish, full_name: str, commit_sha: bytes, cached: bool = True,
        tolerate_tabs: bool = False) -> Any:
    """Return decoded, struct-ified YAML data structure from
    the given file in *repo* at *commit_sha*.

Andreas Klöckner's avatar
Andreas Klöckner committed
    See :class:`relate.utils.Struct` for more on
    struct-ification.

    :arg tolerate_tabs: At one point, Relate accepted tabs
        in indentation, but it no longer does. In places where legacy compatibility
        matters, you may set *tolerate_tabs* to *True*.
        try:
            import django.core.cache as cache
        except ImproperlyConfigured:
            cached = False
        else:
            from urllib.parse import quote_plus
            cache_key = "%%%2".join(
                    (CACHE_KEY_ROOT,
                        quote_plus(repo.controldir()), quote_plus(full_name),
                        commit_sha.decode()))
            def_cache = cache.caches["default"]
            result = None
            # Memcache is apparently limited to 250 characters.
            if len(cache_key) < 240:
                result = def_cache.get(cache_key)
            if result is not None:
                return result
    yaml_bytestream = get_repo_blob(
            repo, full_name, commit_sha, allow_tree=False).data
    yaml_text = yaml_bytestream.decode("utf-8")

    if not tolerate_tabs and LINE_HAS_INDENTING_TABS_RE.search(yaml_text):
        raise ValueError("File uses tabs in indentation. "
                "This is not allowed.")

    expanded = expand_yaml_macros(
            repo, commit_sha, yaml_bytestream)
    yaml_data = load_yaml(expanded)  # type:ignore
    result = dict_to_struct(yaml_data)
    if cached:
        def_cache.add(cache_key, result, None)
Andreas Klöckner's avatar
Andreas Klöckner committed

    return result

def _attr_to_string(key, val):
    if val is None:
        return key
class TagProcessingHTMLParser(html_parser.HTMLParser):
    def __init__(self, out_file, process_tag_func):
        html_parser.HTMLParser.__init__(self)

        self.out_file = out_file
        self.process_tag_func = process_tag_func

    def handle_starttag(self, tag, attrs):
        attrs = dict(attrs)
        attrs.update(self.process_tag_func(tag, attrs))

        self.out_file.write("<{} {}>".format(tag, " ".join(
            _attr_to_string(k, v) for k, v in attrs.items())))

    def handle_endtag(self, tag):
        self.out_file.write("</%s>" % tag)

    def handle_startendtag(self, tag, attrs):
        attrs = dict(attrs)
        attrs.update(self.process_tag_func(tag, attrs))

        self.out_file.write("<{} {}/>".format(tag, " ".join(
            _attr_to_string(k, v) for k, v in attrs.items())))

    def handle_data(self, data):
        self.out_file.write(data)

    def handle_entityref(self, name):
        self.out_file.write("&%s;" % name)

    def handle_charref(self, name):
        self.out_file.write("&#%s;" % name)

    def handle_comment(self, data):
        self.out_file.write("<!--%s-->" % data)

    def handle_decl(self, decl):
        self.out_file.write("<!%s>" % decl)

    def handle_pi(self, data):
        raise NotImplementedError(
                _("I have no idea what a processing instruction is."))

    def unknown_decl(self, data):
        self.out_file.write("<![%s]>" % data)


    def __init__(self, s):
        self.s = s


Andreas Klöckner's avatar
Andreas Klöckner committed
class LinkFixerTreeprocessor(Treeprocessor):
    def __init__(self, md, course, commit_sha, reverse_func):
Andreas Klöckner's avatar
Andreas Klöckner committed
        Treeprocessor.__init__(self)
        self.md = md
Andreas Klöckner's avatar
Andreas Klöckner committed
        self.course = course
        self.commit_sha = commit_sha
        self.reverse_func = reverse_func
    def reverse(self, viewname, args):
        frag = None

        new_args = []
        for arg in args:
            if isinstance(arg, PreserveFragment):
                s = arg.s
Andreas Klöckner's avatar
Andreas Klöckner committed
                frag_index = s.find("#")
                if frag_index != -1:
                    frag = s[frag_index:]
                    s = s[:frag_index]

                new_args.append(s)
            else:
                new_args.append(arg)

        result = self.reverse_func(viewname, args=new_args)

        if frag is not None:
            result += frag

        return result

    def get_course_identifier(self):
        if self.course is None:
            return "bogus-course-identifier"
        else:
            return self.course.identifier
    def process_url(self, url):
        try:
            if url.startswith("course:"):
                course_id = url[7:]
                if course_id:
                    return self.reverse("relate-course_page",
                    return self.reverse("relate-course_page",
                                args=(self.get_course_identifier(),))

            elif url.startswith("flow:"):
                flow_id = url[5:]
                return self.reverse("relate-view_start_flow",
                            args=(self.get_course_identifier(), flow_id))

            elif url.startswith("staticpage:"):
                page_path = url[11:]
                return self.reverse("relate-content_page",
                            args=(
                                self.get_course_identifier(),
                                PreserveFragment(page_path)))
            elif url.startswith("media:"):
                media_path = url[6:]
                return self.reverse("relate-get_media",
                            args=(
                                self.get_course_identifier(),
                                self.commit_sha.decode(),
                                PreserveFragment(media_path)))

            elif url.startswith("repo:"):
                path = url[5:]
                return self.reverse("relate-get_repo_file",
                            args=(
                                self.get_course_identifier(),
                                self.commit_sha.decode(),
                                PreserveFragment(path)))

            elif url.startswith("repocur:"):
                path = url[8:]
                return self.reverse("relate-get_current_repo_file",
                            args=(
                                self.get_course_identifier(),
                                PreserveFragment(path)))
                return self.reverse("relate-view_calendar",
Andreas Klöckner's avatar
Andreas Klöckner committed
                            args=(self.get_course_identifier(),))

            else:
                return None

        except NoReverseMatch:
            from base64 import b64encode
            message = ("Invalid character in RELATE URL: " + url).encode("utf-8")
            return "data:text/plain;base64,"+b64encode(message).decode()
    def process_tag(self, tag_name, attrs):
        changed_attrs = {}

        if tag_name == "table" and attrs.get("bootstrap") != "no":
            changed_attrs["class"] = "table table-condensed"

        if tag_name in ["a", "link"] and "href" in attrs:
            new_href = self.process_url(attrs["href"])

            if new_href is not None:
                changed_attrs["href"] = new_href

        elif tag_name == "img" and "src" in attrs:
            new_src = self.process_url(attrs["src"])

            if new_src is not None:
                changed_attrs["src"] = new_src

        elif tag_name == "object" and "data" in attrs:
            new_data = self.process_url(attrs["data"])

            if new_data is not None:
                changed_attrs["data"] = new_data

        return changed_attrs

    def process_etree_element(self, element):
        changed_attrs = self.process_tag(element.tag, element.attrib)

        for key, val in changed_attrs.items():
            element.set(key, val)

    def walk_and_process_tree(self, root):
        self.process_etree_element(root)
Andreas Klöckner's avatar
Andreas Klöckner committed

        for child in root:
            self.walk_and_process_tree(child)

    def run(self, root):
        self.walk_and_process_tree(root)

        # root through and process Markdown's HTML stash (gross!)
        from io import StringIO

        for i, (html, safe) in enumerate(self.md.htmlStash.rawHtmlBlocks):
            outf = StringIO()
            parser = TagProcessingHTMLParser(outf, self.process_tag)
            parser.feed(html)

            self.md.htmlStash.rawHtmlBlocks[i] = (outf.getvalue(), safe)
Andreas Klöckner's avatar
Andreas Klöckner committed


class LinkFixerExtension(Extension):
    def __init__(
            self, course: Course | None,
            commit_sha: bytes, reverse_func: Callable | None) -> None:
Andreas Klöckner's avatar
Andreas Klöckner committed
        Extension.__init__(self)
        self.course = course
        self.commit_sha = commit_sha
        self.reverse_func = reverse_func
Andreas Klöckner's avatar
Andreas Klöckner committed
    def extendMarkdown(self, md, md_globals):  # noqa
Andreas Klöckner's avatar
Andreas Klöckner committed
        md.treeprocessors["relate_link_fixer"] = \
                LinkFixerTreeprocessor(md, self.course, self.commit_sha,
                        reverse_func=self.reverse_func)
def remove_prefix(prefix: str, s: str) -> str:
    if s.startswith(prefix):
        return s[len(prefix):]
    else:
        return s
JINJA_PREFIX = "[JINJA]"
def expand_markup(
        course: Course | None,
        repo: Repo_ish,
        commit_sha: bytes,
        text: str,
        use_jinja: bool = True,
        jinja_env: Optional[dict] = None,
    if jinja_env is None:
        jinja_env = {}

    if not isinstance(text, str):
        text = str(text)

    # {{{ process through Jinja

    if use_jinja:
        from jinja2 import Environment, StrictUndefined
        env = Environment(
                loader=GitTemplateLoader(repo, commit_sha),
                undefined=StrictUndefined)
        template = env.from_string(text)
        kwargs = {}
        if jinja_env:
            kwargs.update(jinja_env)

        from course.utils import IpynbJinjaMacro
        kwargs[IpynbJinjaMacro.name] = IpynbJinjaMacro(course, repo, commit_sha)
def filter_html_attributes(tag, name, value):
    from bleach.sanitizer import ALLOWED_ATTRIBUTES

    allowed_attrs = ALLOWED_ATTRIBUTES.get(tag, [])
    result = name in allowed_attrs

    if tag == "a":
        result = (result
                or (name == "role" and value == "button")
                or (name == "class" and value.startswith("btn btn-")))
    elif tag == "img":
        result = result or name == "src"
    elif tag == "div":
        result = result or (name == "class" and value == "well")
    elif tag == "i":
        result = result or (name == "class" and value.startswith("bi bi-"))
    elif tag == "table":
        result = (result or (name == "class") or (name == "bootstrap"))
        course: Course | None,
        repo: Repo_ish,
        commit_sha: bytes,
        text: str,
        reverse_func: Optional[Callable] = None,
        validate_only: bool = False,
        use_jinja: bool = True,
        jinja_env: Optional[dict] = None,
    if jinja_env is None:
        jinja_env = {}

    disable_codehilite = bool(
        getattr(settings,
                "RELATE_DISABLE_CODEHILITE_MARKDOWN_EXTENSION", True))

    if course is not None and not jinja_env:
        try:
            import django.core.cache as cache
        except ImproperlyConfigured:
            cache_key = None
        else:
            import hashlib
            cache_key = ("markup:v8:%s:%d:%s:%s:%s%s"
                       course.id, course.trusted_for_markup, str(commit_sha),
                       hashlib.md5(text.encode("utf-8")).hexdigest(),
                       ":NOCODEHILITE" if disable_codehilite else ""
                       ))
            def_cache = cache.caches["default"]
            result = def_cache.get(cache_key)
            if result is not None:
                assert isinstance(result, str)
        if text.lstrip().startswith(JINJA_PREFIX):
            text = remove_prefix(JINJA_PREFIX, text.lstrip())
    else:
        cache_key = None
    text = expand_markup(
            course, repo, commit_sha, text, use_jinja=use_jinja, jinja_env=jinja_env)
    if reverse_func is None:
        from django.urls import reverse
        reverse_func = reverse
    from course.mdx_mathjax import MathJaxExtension
    from course.utils import NBConvertExtension
    extensions: list[markdown.Extension | str] = [
        LinkFixerExtension(course, commit_sha, reverse_func=reverse_func),
        MathJaxExtension(),
        NBConvertExtension(),
        "markdown.extensions.extra",
    ]

    if not disable_codehilite:
        # Note: no matter whether disable_codehilite, the code in
        # the rendered ipython notebook will be highlighted.
        # "css_class=highlight" is to ensure that, when codehilite extension
        # is enabled, code out side of notebook uses the same html class
        # attribute as the default highlight class (i.e., `highlight`)
        # used by rendered ipynb notebook cells, Thus we don't need to
        # make 2 copies of css for the highlight.
        extensions += ["markdown.extensions.codehilite(css_class=highlight)"]
Andreas Klöckner's avatar
Andreas Klöckner committed
    result = markdown.markdown(text,
        output_format="html")
    if course is None or not course.trusted_for_markup:
        import bleach
        result = bleach.clean(result,
                tags=bleach.ALLOWED_TAGS + [
                    "div", "span", "p", "img",
                    "h1", "h2", "h3", "h4", "h5", "h6",
                    "table", "td", "tr", "th", "pre",
                    ],
                attributes=filter_html_attributes)

    assert isinstance(result, str)
Andreas Klöckner's avatar
Andreas Klöckner committed
    if cache_key is not None:
        def_cache.add(cache_key, result, None)

    return result

TITLE_RE = re.compile(r"^\#+\s*(.+)", re.UNICODE)
def extract_title_from_markup(markup_text: str) -> str | None:
    lines = markup_text.split("\n")

Andreas Klöckner's avatar
Andreas Klöckner committed
    for ln in lines[:10]:
        match = TITLE_RE.match(ln)
        if match is not None:
            return match.group(1)

    return None

# {{{ datespec processing

DATE_RE = re.compile(r"^([0-9]+)\-([01][0-9])\-([0-3][0-9])$")
TRAILING_NUMERAL_RE = re.compile(r"^(.*)\s+([0-9]+)$")
class InvalidDatespec(ValueError):
    def __init__(self, datespec):
        ValueError.__init__(self, str(datespec))
        self.datespec = datespec
    def parse(cls, s: str) -> tuple[str, DatespecPostprocessor | None]:
    def apply(self, dtm: datetime.datetime) -> datetime.datetime:
AT_TIME_RE = re.compile(r"^(.*)\s*@\s*([0-2]?[0-9])\:([0-9][0-9])\s*$")


class AtTimePostprocessor(DatespecPostprocessor):
    def __init__(self, hour: int, minute: int, second: int = 0) -> None:
        self.hour = hour
        self.minute = minute
        self.second = second

    @classmethod
    def parse(cls, s):
        match = AT_TIME_RE.match(s)
        if match is not None:
            hour = int(match.group(2))
            minute = int(match.group(3))

            if not (0 <= hour < 24):
                raise InvalidDatespec(s)

            if not (0 <= minute < 60):
                raise InvalidDatespec(s)

            return match.group(1), AtTimePostprocessor(hour, minute)
        else:
            return s, None

    def apply(self, dtm):
        from pytz import timezone
        server_tz = timezone(settings.TIME_ZONE)

        return dtm.astimezone(server_tz).replace(
                    hour=self.hour,
                    minute=self.minute,
                    second=self.second)


PLUS_DELTA_RE = re.compile(r"^(.*)\s*([+-])\s*([0-9]+)\s+"
    "(weeks?|days?|hours?|minutes?)$")


class PlusDeltaPostprocessor(DatespecPostprocessor):
    def __init__(self, count: int, period: str) -> None:
        self.count = count
        self.period = period

    @classmethod
    def parse(cls, s):
        match = PLUS_DELTA_RE.match(s)
        if match is not None:
            count = int(match.group(3))
            if match.group(2) == "-":
                count = -count
            period = match.group(4)

            return match.group(1), PlusDeltaPostprocessor(count, period)
        else:
            return s, None

    def apply(self, dtm):
        if self.period.startswith("week"):
            d = datetime.timedelta(weeks=self.count)
        elif self.period.startswith("day"):
            d = datetime.timedelta(days=self.count)
        elif self.period.startswith("hour"):
            d = datetime.timedelta(hours=self.count)
        else:
            assert self.period.startswith("minute")
            d = datetime.timedelta(minutes=self.count)
DATESPEC_POSTPROCESSORS: list[Any] = [
        AtTimePostprocessor,
        PlusDeltaPostprocessor,
        course: Course | None,
        datespec: str | datetime.date | datetime.datetime,
        vctx: ValidationContext | None = None,
        location: str | None = None,
        ) -> datetime.datetime:
Andreas Klöckner's avatar
Andreas Klöckner committed
    if datespec is None:
        return None

    def localize_if_needed(d: datetime.datetime) -> datetime.datetime:
            from relate.utils import localize_datetime
            return localize_datetime(d)
            return d

    if isinstance(datespec, datetime.datetime):
        return localize_if_needed(datespec)
    if isinstance(datespec, datetime.date):
        return localize_if_needed(
                datetime.datetime.combine(datespec, datetime.time.min))
    datespec_str = cast(str, datespec).strip()
    postprocs: list[DatespecPostprocessor] = []
    while True:
        parsed_one = False
        for pp_class in DATESPEC_POSTPROCESSORS:
            datespec_str, postproc = pp_class.parse(datespec_str)
            if postproc is not None:
                parsed_one = True
                postprocs.insert(0, cast(DatespecPostprocessor, postproc))
    def apply_postprocs(dtime: datetime.datetime) -> datetime.datetime:
        for postproc in postprocs:
            dtime = postproc.apply(dtime)

        return dtime

Andreas Klöckner's avatar
Andreas Klöckner committed
    if match:
                int(match.group(1)),
                int(match.group(2)),
                int(match.group(3)))
        result = localize_if_needed(
                datetime.datetime.combine(res_date, datetime.time.min))
        return apply_postprocs(result)
    is_end = datespec_str.startswith(END_PREFIX)
        datespec_str = datespec_str[len(END_PREFIX):]
    match = TRAILING_NUMERAL_RE.match(datespec_str)
Andreas Klöckner's avatar
Andreas Klöckner committed
    if match:
        event_kind = match.group(1)
        ordinal: int | None = int(match.group(2))
    else:
        # event without numeral
Andreas Klöckner's avatar
Andreas Klöckner committed
        ordinal = None

    if vctx is not None:
        from course.validation import validate_identifier
        validate_identifier(vctx, "%s: event kind" % location, event_kind)

    if course is None:
        return now()

    from course.models import Event
        event_obj = Event.objects.get(
            course=course,
            kind=event_kind,
            ordinal=ordinal)
    except ObjectDoesNotExist:
                    _("Unrecognized date/time specification: '%s' "
                    "(interpreted as 'now'). "
                    "You should add an event with this name.")
    if is_end:
        if event_obj.end_time is not None:
            result = event_obj.end_time
        else:
            result = event_obj.time
            if vctx is not None:
                vctx.add_warning(
                        location,
                        _("event '%s' has no end time, using start time instead")
                        % orig_datespec)

    else:
        result = event_obj.time

    return apply_postprocs(result)


        course: Course,
        chunk: ChunkDesc,
        roles: list[str],
        now_datetime: datetime.datetime,
        facilities: frozenset[str],
        ) -> tuple[float, bool]:
    if not hasattr(chunk, "rules"):
        return 0, True

    for rule in chunk.rules:
        if hasattr(rule, "if_has_role"):
            if all(role not in rule.if_has_role for role in roles):
                continue
        if hasattr(rule, "if_after"):
            start_date = parse_date_spec(course, rule.if_after)
            if now_datetime < start_date:
                continue

        if hasattr(rule, "if_before"):
            end_date = parse_date_spec(course, rule.if_before)
            if end_date < now_datetime:
                continue

        if hasattr(rule, "if_in_facility"):
            if rule.if_in_facility not in facilities:
        if hasattr(rule, "roles"):  # pragma: no cover  # deprecated
            if all(role not in rule.roles for role in roles):
        if hasattr(rule, "start"):  # pragma: no cover  # deprecated
            start_date = parse_date_spec(course, rule.start)
            if now_datetime < start_date:
Andreas Klöckner's avatar
Andreas Klöckner committed
                continue
        if hasattr(rule, "end"):  # pragma: no cover  # deprecated
            end_date = parse_date_spec(course, rule.end)
            if end_date < now_datetime:
Andreas Klöckner's avatar
Andreas Klöckner committed
                continue
        shown = True
        if hasattr(rule, "shown"):
            shown = rule.shown

        return rule.weight, shown
        course: Course,
        repo: Repo_ish,
        commit_sha: bytes,
        page_desc: StaticPageDesc,
        roles: list[str],
        now_datetime: datetime.datetime,
        facilities: frozenset[str],
        ) -> list[ChunkDesc]:
    for chunk in page_desc.chunks:
        chunk.weight, chunk.shown = \
                compute_chunk_weight_and_shown(
        chunk.html_content = markup_to_html(course, repo, commit_sha, chunk.content)
        if not hasattr(chunk, "title"):
            chunk.title = extract_title_from_markup(chunk.content)
    page_desc.chunks.sort(key=lambda chunk: chunk.weight, reverse=True)
    return [chunk for chunk in page_desc.chunks
            if chunk.shown]
def normalize_page_desc(page_desc: StaticPageDesc) -> StaticPageDesc:
    if hasattr(page_desc, "content"):
        content = page_desc.content
        from relate.utils import struct_to_dict, Struct
        d = struct_to_dict(page_desc)
        del d["content"]
        d["chunks"] = [Struct({"id": "main", "content": content})]
def get_staticpage_desc(
        repo: Repo_ish, course: Course, commit_sha: bytes, filename: str
        ) -> StaticPageDesc:
    page_desc = get_yaml_from_repo(repo, filename, commit_sha)
    page_desc = normalize_page_desc(page_desc)
    return page_desc


def get_course_desc(repo: Repo_ish, course: Course, commit_sha: bytes) -> CourseDesc:

    return cast(
            CourseDesc,
            get_staticpage_desc(repo, course, commit_sha, course.course_file))
def normalize_flow_desc(flow_desc: FlowDesc) -> FlowDesc:
    if hasattr(flow_desc, "pages"):
        pages = flow_desc.pages
        from relate.utils import struct_to_dict, Struct
        d = struct_to_dict(flow_desc)
        del d["pages"]
        d["groups"] = [Struct({"id": "main", "pages": pages})]
    if hasattr(flow_desc, "rules"):
        rules = flow_desc.rules
        if not hasattr(rules, "grade_identifier"):  # pragma: no cover  # deprecated
            # Legacy content with grade_identifier in grading rule,
            # move first found grade_identifier up to rules.

            rules.grade_identifier = None
            rules.grade_aggregation_strategy = None

                if grule.grade_identifier is not None:  # type: ignore
                    rules.grade_identifier = grule.grade_identifier  # type: ignore
                    rules.grade_aggregation_strategy = (  # type: ignore
                            grule.grade_aggregation_strategy)  # type: ignore
def get_flow_desc(
        repo: Repo_ish, course: Course, flow_id: str,
        commit_sha: bytes, tolerate_tabs: bool = False) -> FlowDesc:
    """
    :arg tolerate_tabs: At one point, Relate accepted tabs
        in indentation, but it no longer does. In places where legacy
        compatibility matters, you may set *tolerate_tabs* to *True*.
    """
Dong Zhuang's avatar
Dong Zhuang committed
    # FIXME: extension should be case-insensitive
    flow_desc = get_yaml_from_repo(repo, "flows/%s.yml" % flow_id, commit_sha,
            tolerate_tabs=tolerate_tabs)

    flow_desc = normalize_flow_desc(flow_desc)
    return flow_desc
def get_flow_page_desc(flow_id: str, flow_desc: FlowDesc,
        group_id: str, page_id: str) -> FlowPageDesc:
    for grp in flow_desc.groups:
        if grp.id == group_id:
            for page in grp.pages:
                if page.id == page_id:
                    return page

    raise ObjectDoesNotExist(
            _("page '%(group_id)s/%(page_id)s' in flow '%(flow_id)s'") % {
                "group_id": group_id,
                "page_id": page_id,
                "flow_id": flow_id
class ClassNotFoundError(RuntimeError):
def import_class(name: str) -> type:
    components = name.split(".")
    if len(components) < 2:
        # need at least one module plus class name
        raise ClassNotFoundError(name)
    from importlib import import_module
    mod_components = len(components) - 1
    while mod_components:
        module_name = ".".join(components[:mod_components])
            mod = import_module(module_name)
        except ImportError:
            mod_components -= 1
            continue

        sym = mod
        for cls_comp in components[mod_components:]:
            try:
                sym = getattr(sym, cls_comp)
            except AttributeError:
                raise ClassNotFoundError(name)

        if isinstance(sym, type):
            return sym
        else:
            raise ClassNotFoundError(f"'{name}' does not name a type")
    raise ClassNotFoundError(name)
def get_flow_page_class(repo: Repo_ish, typename: str, commit_sha: bytes) -> type:
    # look among default page types
    import course.page
    try:
        return getattr(course.page, typename)
    except AttributeError:
        pass
    # try a global dotted-name import
    try:
        return import_class(typename)
    except ClassNotFoundError:
        pass
    raise ClassNotFoundError(typename)
def instantiate_flow_page(
        location: str, repo: Repo_ish, page_desc: FlowPageDesc, commit_sha: bytes
        ) -> PageBase:
    class_ = get_flow_page_class(repo, page_desc.type, commit_sha)
    return class_(None, location, page_desc)
class CourseCommitSHADoesNotExist(Exception):
def get_course_commit_sha(
        course: Course,
        participation: Participation | None,
        repo: Repo_ish | None = None,
        raise_on_nonexistent_preview_commit: bool | None = False
        ) -> bytes:
    sha = course.active_git_commit_sha

    def is_commit_sha_valid(repo: Repo_ish, commit_sha: str) -> bool:
        if isinstance(repo, SubdirRepoWrapper):
            repo = repo.repo
        try:
            repo[commit_sha.encode()]
        except KeyError:
            if raise_on_nonexistent_preview_commit:
                raise CourseCommitSHADoesNotExist(
                    _("Preview revision '%s' does not exist--"
                      "showing active course content instead."
            return False

        return True