Skip to content
utils.py 44.1 KiB
Newer Older
from __future__ import annotations
Andreas Klöckner's avatar
Andreas Klöckner committed

__copyright__ = "Copyright (C) 2014 Andreas Kloeckner"

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

Andreas Klöckner's avatar
Andreas Klöckner committed
import datetime
from collections.abc import Collection, Iterable
Andreas Klöckner's avatar
Andreas Klöckner committed
from contextlib import ContextDecorator
from dataclasses import dataclass
from ipaddress import IPv4Address, IPv6Address
from typing import (
    TYPE_CHECKING,
    Any,
    cast,
Andreas Klöckner's avatar
Andreas Klöckner committed
)
from django import forms, http
from django.core.exceptions import ObjectDoesNotExist
from django.shortcuts import get_object_or_404, render
from django.utils import translation
from django.utils.safestring import SafeString, mark_safe
Andreas Klöckner's avatar
Andreas Klöckner committed
from django.utils.translation import gettext as _, pgettext_lazy
Andreas Klöckner's avatar
Andreas Klöckner committed
from course.constants import flow_permission, flow_rule_kind
from course.content import (
    CourseCommitSHADoesNotExist,
    FlowDesc,
    FlowPageDesc,
    FlowSessionAccessRuleDesc,
    FlowSessionGradingRuleDesc,
    FlowSessionStartRuleDesc,
    get_course_commit_sha,
    get_course_repo,
    get_flow_desc,
    parse_date_spec,
Andreas Klöckner's avatar
Andreas Klöckner committed
)
from course.page.base import PageBase, PageContext
from relate.utils import (
    RelateHttpRequest,
    not_none,
    remote_address_from_request,
    string_concat,
)
Andreas Klöckner's avatar
Andreas Klöckner committed


    from course.content import Repo_ish
    from course.models import (
        Course,
        ExamTicket,
        FlowPageData,
        FlowSession,
        Participation,
Andreas Klöckner's avatar
Andreas Klöckner committed
    )
    from relate.utils import Repo_ish  # noqa
Andreas Klöckner's avatar
Andreas Klöckner committed


CODE_CELL_DIV_ATTRS_RE = re.compile(r'(<div class="[^>]*code_cell[^>"]*")(>)')
def getattr_with_fallback(
        aggregates: Iterable[Any], attr_name: str, default: Any = None) -> Any:
    for agg in aggregates:
        result = getattr(agg, attr_name, None)
        if result is not None:
            return result

    return default


class FlowSessionStartRule(FlowSessionRuleBase):
            tag_session: str | None = None,
            may_start_new_session: bool | None = None,
            may_list_existing_sessions: bool | None = None,
            default_expiration_mode: str | None = None,
            ) -> None:
        self.tag_session = tag_session
        self.may_start_new_session = may_start_new_session
        self.may_list_existing_sessions = may_list_existing_sessions
        self.default_expiration_mode = default_expiration_mode


class FlowSessionAccessRule(FlowSessionRuleBase):
            permissions: frozenset[str],
            message: str | None = None,
            ) -> None:
        self.permissions = permissions
        self.message = message
    def human_readable_permissions(self):
        from course.models import FLOW_PERMISSION_CHOICES
        permission_dict = dict(FLOW_PERMISSION_CHOICES)
        return [permission_dict[p] for p in self.permissions]
class FlowSessionGradingRule(FlowSessionRuleBase):
            grade_aggregation_strategy: str,
            generates_grade: bool,
            description: str | None = None,
            credit_percent: float | None = None,
            use_last_activity_as_completion_time: bool | None = None,
            max_points: float | None = None,
            max_points_enforced_cap: float | None = None,
            bonus_points: float = 0,

        self.grade_identifier = grade_identifier
        self.grade_aggregation_strategy = grade_aggregation_strategy
        self.due = due
        self.generates_grade = generates_grade
        self.description = description
        self.credit_percent = credit_percent
        self.use_last_activity_as_completion_time = \
                use_last_activity_as_completion_time
        self.max_points = max_points
        self.max_points_enforced_cap = max_points_enforced_cap
        self.bonus_points = bonus_points


def _eval_generic_conditions(
        rule: Any,
        course: Course,
        participation: Participation | None,
        now_datetime: datetime.datetime,
        flow_id: str,
        login_exam_ticket: ExamTicket | None,
        *,
        remote_ip_address: IPv4Address | IPv6Address | None = None,
        ) -> bool:
    if hasattr(rule, "if_before"):
        ds = parse_date_spec(course, rule.if_before)
        if not (now_datetime <= ds):
            return False

    if hasattr(rule, "if_after"):
        ds = parse_date_spec(course, rule.if_after)
        if not (now_datetime >= ds):
            return False

    if hasattr(rule, "if_has_role"):
        from course.enrollment import get_participation_role_identifiers
        roles = get_participation_role_identifiers(course, participation)
        if all(role not in rule.if_has_role for role in roles):
    if (hasattr(rule, "if_signed_in_with_matching_exam_ticket")
            and rule.if_signed_in_with_matching_exam_ticket):
        if login_exam_ticket is None:
            return False
        if login_exam_ticket.exam.flow_id != flow_id:
            return False
        if login_exam_ticket.participation != participation:
    if hasattr(rule, "if_has_prairietest_exam_access"):
        if remote_ip_address is None:
            return False
        if participation is None:
            return False

        from prairietest.utils import has_access_to_exam
        if not has_access_to_exam(
                    course,
                    participation.user.email,
                    rule.if_has_prairietest_exam_access,
                    now_datetime,
                    remote_ip_address,
                ):
            return False

        rule: Any,
        session: FlowSession,
        now_datetime: datetime.datetime,
        ) -> bool:
    if hasattr(rule, "if_has_tag"):
        if session.access_rules_tag != rule.if_has_tag:
            return False

    if hasattr(rule, "if_started_before"):
        ds = parse_date_spec(session.course, rule.if_started_before)
        if not session.start_time < ds:
            return False

    return True


def _eval_participation_tags_conditions(
        rule: Any,
        participation: Participation | None,
        ) -> bool:

    participation_tags_any_set = (
        set(getattr(rule, "if_has_participation_tags_any", [])))
    participation_tags_all_set = (
        set(getattr(rule, "if_has_participation_tags_all", [])))

    if participation_tags_any_set or participation_tags_all_set:
        if not participation:
            # Return False for anonymous users if only
            # if_has_participation_tags_any or if_has_participation_tags_all
            # is not empty.
            return False
        ptag_set = set(participation.tags.all().values_list("name", flat=True))
        if not ptag_set:
            return False
Andreas Klöckner's avatar
Andreas Klöckner committed
        if (
                participation_tags_any_set
                and not participation_tags_any_set & ptag_set):
Andreas Klöckner's avatar
Andreas Klöckner committed
        if (
                participation_tags_all_set
                and not participation_tags_all_set <= ptag_set):
        flow_desc: FlowDesc,
        kind: str,
        participation: Participation | None,
        flow_id: str,
        now_datetime: datetime.datetime,
        consider_exceptions: bool = True,
        default_rules_desc: list[Any] | None = None
    if default_rules_desc is None:
        default_rules_desc = []
    if (not hasattr(flow_desc, "rules")
            or not hasattr(flow_desc.rules, kind)):
    else:
        rules = getattr(flow_desc.rules, kind)[:]

    from course.models import FlowRuleException
    if consider_exceptions:
                FlowRuleException.objects
                .filter(
                    participation=participation,
                    flow_id=flow_id)
                # rules created first will get inserted first, and show up last
                .order_by("creation_time")):
            if exc.expiration is not None and now_datetime > exc.expiration:
                continue
Andreas Klöckner's avatar
Andreas Klöckner committed
            from relate.utils import dict_to_struct
            rules.insert(0, dict_to_struct(exc.rule))
        course: Course,
        participation: Participation | None,
        flow_id: str,
        flow_desc: FlowDesc,
        now_datetime: datetime.datetime,
        facilities: Collection[str] | None = None,
        for_rollover: bool = False,
        login_exam_ticket: ExamTicket | None = None,
        *,
        remote_ip_address: IPv4Address | IPv6Address | None = None,
        ) -> FlowSessionStartRule:
    """Return a :class:`FlowSessionStartRule` if a new session is
    permitted or *None* if no new session is allowed.
    """
    if facilities is None:
        facilities = frozenset()

    from relate.utils import dict_to_struct
    rules: list[FlowSessionStartRuleDesc] = get_flow_rules(
            flow_desc, flow_rule_kind.start,
            participation, flow_id, now_datetime,
            default_rules_desc=[
                dict_to_struct({
                    "may_start_new_session": True,
                    "may_list_existing_sessions": False})])
    from course.models import FlowSession
        if not _eval_generic_conditions(rule, course, participation,
                now_datetime, flow_id=flow_id,
                login_exam_ticket=login_exam_ticket,
                remote_ip_address=remote_ip_address):
        if not _eval_participation_tags_conditions(rule, participation):
            continue

        if not for_rollover and hasattr(rule, "if_in_facility"):
            if rule.if_in_facility not in facilities:
        if not for_rollover and hasattr(rule, "if_has_in_progress_session"):
            session_count = FlowSession.objects.filter(
                    participation=participation,
                    course=course,
                    flow_id=flow_id,
                    in_progress=True).count()

            if bool(session_count) != rule.if_has_in_progress_session:
                continue

        if not for_rollover and hasattr(rule, "if_has_session_tagged"):
            tagged_session_count = FlowSession.objects.filter(
                    participation=participation,
                    course=course,
                    access_rules_tag=rule.if_has_session_tagged,
                    flow_id=flow_id).count()

            if not tagged_session_count:
                continue

        if not for_rollover and hasattr(rule, "if_has_fewer_sessions_than"):
            session_count = FlowSession.objects.filter(
                    participation=participation,
                    course=course,
                    flow_id=flow_id).count()
            if session_count >= rule.if_has_fewer_sessions_than:
                continue
        if not for_rollover and hasattr(rule, "if_has_fewer_tagged_sessions_than"):
            tagged_session_count = FlowSession.objects.filter(
                    participation=participation,
                    course=course,
                    access_rules_tag__isnull=False,
                    flow_id=flow_id).count()

            if tagged_session_count >= rule.if_has_fewer_tagged_sessions_than:
                continue

                tag_session=getattr(rule, "tag_session", None),
                may_start_new_session=getattr(
                    rule, "may_start_new_session", True),
                may_list_existing_sessions=getattr(
                    rule, "may_list_existing_sessions", True),
                default_expiration_mode=getattr(
                    rule, "default_expiration_mode", None),
    return FlowSessionStartRule(
            may_list_existing_sessions=False,
            may_start_new_session=False)
        session: FlowSession,
        flow_desc: FlowDesc,
        now_datetime: datetime.datetime,
        facilities: Collection[str] | None = None,
        login_exam_ticket: ExamTicket | None = None,
        *,
        remote_ip_address: IPv4Address | IPv6Address | None = None,
        ) -> FlowSessionAccessRule:
    if facilities is None:
        facilities = frozenset()

    from relate.utils import dict_to_struct
    rules: list[FlowSessionAccessRuleDesc] = get_flow_rules(
            flow_desc, flow_rule_kind.access,
            session.participation, session.flow_id, now_datetime,
            default_rules_desc=[
                dict_to_struct({
                    "permissions": [flow_permission.view],
                    })])
    for rule in rules:
        if not _eval_generic_conditions(
                    rule, session.course, session.participation,
                    now_datetime, flow_id=session.flow_id,
                    login_exam_ticket=login_exam_ticket,
                    remote_ip_address=remote_ip_address,
                ):
        if not _eval_participation_tags_conditions(rule, session.participation):
            continue

        if not _eval_generic_session_conditions(rule, session, now_datetime):
        if hasattr(rule, "if_in_facility"):
            if rule.if_in_facility not in facilities:
        if hasattr(rule, "if_in_progress"):
            if session.in_progress != rule.if_in_progress:
                continue
        if hasattr(rule, "if_expiration_mode"):
            if session.expiration_mode != rule.if_expiration_mode:
        if hasattr(rule, "if_session_duration_shorter_than_minutes"):
            duration_min = (now_datetime - session.start_time).total_seconds() / 60

            if session.participation is not None:
                duration_min /= float(session.participation.time_factor)

            if duration_min > rule.if_session_duration_shorter_than_minutes:
                continue

        permissions = set(rule.permissions)
        # {{{ deal with deprecated permissions

        if "modify" in permissions:
            permissions.remove("modify")
            permissions.update([
                flow_permission.submit_answer,
                flow_permission.end_session,
        if "see_answer" in permissions:
            permissions.remove("see_answer")
            permissions.add(flow_permission.see_answer_after_submission)

        # }}}

        # Remove 'modify' permission from not-in-progress sessions
        if not session.in_progress:
            for perm in [
                    flow_permission.submit_answer,
                    flow_permission.end_session,
                    ]:
                if perm in permissions:
                    permissions.remove(perm)
        return FlowSessionAccessRule(
                permissions=frozenset(permissions),
                message=getattr(rule, "message", None)
                )
    return FlowSessionAccessRule(permissions=frozenset())
        session: FlowSession,
        flow_desc: FlowDesc,
        now_datetime: datetime.datetime
        ) -> FlowSessionGradingRule:
    flow_desc_rules = getattr(flow_desc, "rules", None)

    from relate.utils import dict_to_struct
    rules: list[FlowSessionGradingRuleDesc] = get_flow_rules(
            flow_desc, flow_rule_kind.grading,
            session.participation, session.flow_id, now_datetime,
            default_rules_desc=[
                dict_to_struct({
                    "generates_grade": False,
                    })])
    from course.enrollment import get_participation_role_identifiers
    roles = get_participation_role_identifiers(session.course, session.participation)

    for rule in rules:
        if hasattr(rule, "if_has_role"):
            if all(role not in rule.if_has_role for role in roles):
        if not _eval_generic_session_conditions(rule, session, now_datetime):
        if not _eval_participation_tags_conditions(rule, session.participation):
            continue

        if hasattr(rule, "if_completed_before"):
            ds = parse_date_spec(session.course, rule.if_completed_before)

            use_last_activity_as_completion_time = False
            if hasattr(rule, "use_last_activity_as_completion_time"):
                use_last_activity_as_completion_time = \
                        rule.use_last_activity_as_completion_time

            if use_last_activity_as_completion_time:
                last_activity = session.last_activity()
                if last_activity is not None:
                    completion_time = last_activity
                else:
                    completion_time = now_datetime
            else:
                if session.in_progress:
                    completion_time = now_datetime
                else:
                    completion_time = not_none(session.completion_time)
        due_str = getattr(rule, "due", None)
        if due_str is not None:
            due = parse_date_spec(session.course, due_str)
            assert due.tzinfo is not None
        else:
            due = None
        generates_grade = getattr(rule, "generates_grade", True)

        grade_identifier = None
        grade_aggregation_strategy = None
        if flow_desc_rules is not None:
            grade_identifier = flow_desc_rules.grade_identifier
            grade_aggregation_strategy = getattr(
                    flow_desc_rules, "grade_aggregation_strategy", None)

        bonus_points = getattr_with_fallback((rule, flow_desc), "bonus_points", 0)
        max_points = getattr_with_fallback((rule, flow_desc), "max_points", None)
        max_points_enforced_cap = getattr_with_fallback(
                (rule, flow_desc), "max_points_enforced_cap", None)

        grade_aggregation_strategy = cast(str, grade_aggregation_strategy)
        return FlowSessionGradingRule(
                grade_aggregation_strategy=grade_aggregation_strategy,
                generates_grade=generates_grade,
                description=getattr(rule, "description", None),
                credit_percent=getattr(rule, "credit_percent", 100),
                use_last_activity_as_completion_time=getattr(
                    rule, "use_last_activity_as_completion_time", False),

                bonus_points=bonus_points,
                max_points=max_points,
                max_points_enforced_cap=max_points_enforced_cap,
ifaint's avatar
ifaint committed
    raise RuntimeError(_("grading rule determination was unable to find "
            "a grading rule"))
class AnyArgumentType:
ANY_ARGUMENT = AnyArgumentType()


class CoursePageContext:
    def __init__(self, request: http.HttpRequest, course_identifier: str) -> None:
        # account for monkeypatching
        self.request = cast(RelateHttpRequest, request)

        self.course_identifier = course_identifier
        self._permissions_cache: frozenset[tuple[str, str | None]] | None = None
        self._role_identifiers_cache: list[str] | None = None
        self.old_language: str | None = None

        # using this to prevent nested using as context manager
        self._is_in_context_manager = False
        from course.models import Course
        self.course = get_object_or_404(Course, identifier=course_identifier)
        from course.enrollment import get_participation_for_request
        self.participation = get_participation_for_request(
                request, self.course)

        from course.views import check_course_state
        check_course_state(self.course, self.participation)

        self.repo = get_course_repo(self.course)

        try:
            sha = get_course_commit_sha(
                self.course, self.participation,
                repo=self.repo,
                raise_on_nonexistent_preview_commit=True)
        except CourseCommitSHADoesNotExist as e:
            from django.contrib import messages
            messages.add_message(request, messages.ERROR, str(e))

            sha = self.course.active_git_commit_sha.encode()

        self.course_commit_sha = sha

    def role_identifiers(self) -> list[str]:
        if self._role_identifiers_cache is not None:
            return self._role_identifiers_cache

        from course.enrollment import get_participation_role_identifiers
        self._role_identifiers_cache = get_participation_role_identifiers(
                self.course, self.participation)
        return self._role_identifiers_cache

    def permissions(self) -> frozenset[tuple[str, str | None]]:
Andreas Klöckner's avatar
Andreas Klöckner committed
        if self.participation is None:
            if self._permissions_cache is not None:
                return self._permissions_cache

            from course.enrollment import get_participation_permissions
            perm = get_participation_permissions(self.course, self.participation)
Andreas Klöckner's avatar
Andreas Klöckner committed

Andreas Klöckner's avatar
Andreas Klöckner committed
        else:
            return self.participation.permissions()
    def has_permission(
            self, perm: str, argument: str | AnyArgumentType | None = None
            ) -> bool:
        if argument is ANY_ARGUMENT:
            return any(perm == p
                    for p, arg in self.permissions())
        else:
            return (perm, argument) in self.permissions()
    def _set_course_lang(self, action: str) -> None:
        if self.course.force_lang and self.course.force_lang.strip():
            if action == "activate":
                self.old_language = translation.get_language()
                translation.activate(self.course.force_lang)
            else:
                if self.old_language is None:
                    # This should be a rare case, but get_language() can be None.
                    # See django.utils.translation.override.__exit__()
                    translation.deactivate_all()
                else:
                    translation.activate(self.old_language)
Dong Zhuang's avatar
Dong Zhuang committed
    def __enter__(self):
        if self._is_in_context_manager:
            raise RuntimeError(
                "Nested use of 'course_view' as context manager "
                "is not allowed.")
        self._is_in_context_manager = True
        self._set_course_lang(action="activate")
Dong Zhuang's avatar
Dong Zhuang committed
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._is_in_context_manager = False
        self._set_course_lang(action="deactivate")
Dong Zhuang's avatar
Dong Zhuang committed
        self.repo.close()

class FlowContext:
    def __init__(
            self,
            repo: Repo_ish,
            course: Course,
            flow_id: str,
            participation: Participation | None = None) -> None:
        """*participation* and *flow_session* are not stored and only used
        to figure out versioning of the flow content.
        """
        self.repo = repo
        self.course = course
        self.flow_id = flow_id
        from django.core.exceptions import ObjectDoesNotExist
        self.course_commit_sha = get_course_commit_sha(
                self.course, participation)

            self.flow_desc = get_flow_desc(self.repo, self.course,
                    flow_id, self.course_commit_sha)
        except ObjectDoesNotExist:
            raise http.Http404()
class PageOrdinalOutOfRange(http.Http404):
    pass


class FlowPageContext(FlowContext):
    """This object acts as a container for all the information that a flow page
    may need to render itself or respond to a POST.

    Note that this is different from :class:`course.page.PageContext`,
    which is used for in the page API.
            repo: Repo_ish,
            course: Course,
            flow_id: str,
            page_ordinal: int,
            participation: Participation | None,
            flow_session: FlowSession,
            request: http.HttpRequest | None = None,
            ) -> None:
        super().__init__(repo, course, flow_id, participation)
        if page_ordinal >= not_none(flow_session.page_count):
            raise PageOrdinalOutOfRange()

        from course.models import FlowPageData
        page_data = self.page_data = get_object_or_404(
                FlowPageData, flow_session=flow_session, page_ordinal=page_ordinal)

        from course.content import get_flow_page_desc
            self.page_desc: FlowPageDesc | None = get_flow_page_desc(
Dong Zhuang's avatar
Dong Zhuang committed
                    flow_session.flow_id, self.flow_desc, page_data.group_id,
        except ObjectDoesNotExist:
            self.page_desc = None
            self.page: PageBase | None = None
            self.page_context: PageContext | None = None
        else:
            self.page = instantiate_flow_page_with_ctx(self, page_data)
            page_uri = None
            if request is not None:
                from django.urls import reverse
                page_uri = request.build_absolute_uri(
                    reverse(
                        "relate-view_flow_page",
                        args=(course.identifier, flow_session.id, page_ordinal)))
            self.page_context = PageContext(
                    course=self.course, repo=self.repo,
                    commit_sha=self.course_commit_sha,
                    flow_session=flow_session,
                    page_uri=page_uri,
                    request=request)
        self._prev_answer_visit = False
    @property
    def prev_answer_visit(self):
        if self._prev_answer_visit is False:
            from course.flow import get_prev_answer_visit
            self._prev_answer_visit = get_prev_answer_visit(self.page_data)
        return self._prev_answer_visit
    def page_ordinal(self):
        return self.page_data.page_ordinal
def instantiate_flow_page_with_ctx(
        fctx: FlowContext, page_data: FlowPageData) -> PageBase:
    from course.content import get_flow_page_desc
    page_desc = get_flow_page_desc(
            fctx.flow_id, fctx.flow_desc,
            page_data.group_id, page_data.page_id)

    from course.content import instantiate_flow_page
    return instantiate_flow_page(
            f"course '{fctx.course.identifier}', "
            f"flow '{fctx.flow_id}', page '{page_data.group_id}/{page_data.page_id}'",
            fctx.repo, page_desc, fctx.course_commit_sha)

# {{{ utilities for course-based views
def course_view(f):
    def wrapper(request, course_identifier, *args, **kwargs):
Dong Zhuang's avatar
Dong Zhuang committed
        with CoursePageContext(request, course_identifier) as pctx:
            response = f(pctx, *args, **kwargs)
            pctx.repo.close()
            return response

    from functools import update_wrapper
    update_wrapper(wrapper, f)

    return wrapper


class ParticipationPermissionWrapper:
    def __init__(self, pctx: CoursePageContext) -> None:
    def __getitem__(self, perm: str) -> bool:

        from course.constants import participation_permission
        try:
            getattr(participation_permission, perm)
        except AttributeError:
            raise ValueError(f"permission name '{perm}' not valid")
        return self.pctx.has_permission(perm, ANY_ARGUMENT)

    def __iter__(self):
        raise TypeError("ParticipationPermissionWrapper is not iterable.")


def render_course_page(
        pctx: CoursePageContext, template_name: str, args: dict[str, Any],
        allow_instant_flow_requests: bool = True) -> http.HttpResponse:
    from course.views import get_now_or_fake_time
    now_datetime = get_now_or_fake_time(pctx.request)

    if allow_instant_flow_requests:
        from course.models import InstantFlowRequest
        instant_flow_requests = list(InstantFlowRequest.objects
                .filter(
                    course=pctx.course,
                    start_time__lte=now_datetime,
                    end_time__gte=now_datetime,
                    cancelled=False)
    else:
        instant_flow_requests = []

    args.update({
        "course": pctx.course,
        "pperm": ParticipationPermissionWrapper(pctx),
        "participation": pctx.participation,
        "num_instant_flow_requests": len(instant_flow_requests),
        "instant_flow_requests":
        [(i+1, r) for i, r in enumerate(instant_flow_requests)],
        })

    return render(pctx.request, template_name, args)
Andreas Klöckner's avatar
Andreas Klöckner committed

    """Caches instances of :class:`course.page.Page`."""

    def __init__(self, repo, course, flow_id):
        self.repo = repo
        self.course = course
        self.flow_id = flow_id
        self.flow_desc_cache = {}
        self.page_cache = {}

    def get_flow_desc_from_cache(self, commit_sha):
        try:
            return self.flow_desc_cache[commit_sha]
        except KeyError:
            flow_desc = get_flow_desc(self.repo, self.course,
                    self.flow_id, commit_sha)
            self.flow_desc_cache[commit_sha] = flow_desc
            return flow_desc

    def get_page(self, group_id, page_id, commit_sha):
        key = (group_id, page_id, commit_sha)
        try:
            return self.page_cache[key]
        except KeyError:

            from course.content import get_flow_page_desc, instantiate_flow_page
            page_desc = get_flow_page_desc(
                    self.flow_id,
                    self.get_flow_desc_from_cache(commit_sha),
                    group_id, page_id)

            page = instantiate_flow_page(
                    location=(
                        f"flow '{self.flow_id}', "
                        f"group '{group_id}', page '{page_id}'"),
                    repo=self.repo, page_desc=page_desc,
                    commit_sha=commit_sha)

            self.page_cache[key] = page
            return page

# }}}

# {{{ codemirror config

@dataclass(frozen=True)
class JsLiteral:
    js: str


def repr_js(obj: Any) -> str:
    if isinstance(obj, list):
        return "[{}]".format(", ".join(repr_js(ch) for ch in obj))
    elif isinstance(obj, dict):
        return "{{{}}}".format(", ".join(f"{k}: {repr_js(v)}" for k, v in obj.items()))
    elif isinstance(obj, bool):
        return repr(obj).lower()
    elif isinstance(obj, int | float):
        return repr(obj)
    elif isinstance(obj, str):
        return repr(obj)
    elif isinstance(obj, JsLiteral):
        return obj.js
    else:
        raise ValueError(f"unsupported object type: {type(obj)}")


class CodeMirrorTextarea(forms.Textarea):
    @property
    def media(self):
        return forms.Media(js=["bundle-codemirror.js"])

    def __init__(self, attrs=None,
                 *,
                 language_mode=None, interaction_mode,
                 indent_unit: int,
                 autofocus: bool,
                 additional_keys: dict[str, JsLiteral],
                 **kwargs):
        super().__init__(attrs, **kwargs)
        self.language_mode = language_mode
        self.interaction_mode = interaction_mode
        self.indent_unit = indent_unit
        self.autofocus = autofocus
        self.additional_keys = additional_keys

    # TODO: Maybe add VSCode keymap?
    # https://github.com/replit/codemirror-vscode-keymap
    def render(self, name, value, attrs=None, renderer=None) -> SafeString:
        # based on
        # https://github.com/codemirror/basic-setup/blob/b3be7cd30496ee578005bd11b1fa6a8b21fcbece/src/codemirror.ts
        extensions = [
                JsLiteral(f"rlCodemirror.indentUnit.of({' ' * self.indent_unit !r})"),
                ]

        if self.interaction_mode == "vim":
            extensions.insert(0, JsLiteral("rlCodemirror.vim()"))
        elif self.interaction_mode == "emacs":
            extensions.insert(0, JsLiteral("rlCodemirror.emacs()"))
        else:
            pass

        if self.language_mode is not None:
            extensions.append(JsLiteral(f"rlCodemirror.{self.language_mode}()"))

        additional_keys = [
            {
                "key": key,
                "run": func,
            }
            for key, func in self.additional_keys.items()
        ]
        output = [super().render(
                        name, value, attrs, renderer),