from __future__ import annotations __copyright__ = "Copyright (C) 2014 Andreas Kloeckner" __license__ = """ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import datetime from typing import ( TYPE_CHECKING, Any, cast, ) import django.forms as forms import django.views.decorators.http as http_dec from crispy_forms.layout import Div, Layout, Submit from django import http from django.contrib import messages from django.contrib.auth.decorators import login_required from django.core.exceptions import ( ObjectDoesNotExist, PermissionDenied, SuspiciousOperation, ) from django.db import transaction from django.shortcuts import get_object_or_404, redirect, render from django.utils.safestring import mark_safe from django.utils.translation import ( gettext as _, pgettext, pgettext_lazy, ) from django.views.decorators.cache import cache_control from django_select2.forms import Select2Widget from course.auth import get_pre_impersonation_user from course.constants import ( FLOW_PERMISSION_CHOICES, FLOW_RULE_KIND_CHOICES, flow_rule_kind, participation_permission as pperm, participation_status, ) from course.content import get_course_repo from course.enrollment import ( get_participation_for_request, get_participation_permissions, ) from course.models import ( Course, FlowRuleException, FlowSession, InstantFlowRequest, Participation, ) from course.utils import ( CoursePageContext, course_view, get_course_specific_language_choices, render_course_page, ) from relate.utils import ( HTML5DateInput, HTML5DateTimeInput, RelateHttpRequest, StyledForm, StyledModelForm, string_concat, ) # {{{ for mypy if TYPE_CHECKING: from accounts.models import User from course.content import FlowDesc # }}} NONE_SESSION_TAG = string_concat("<<<", _("NONE"), ">>>") # {{{ home def home(request: http.HttpRequest) -> http.HttpResponse: now_datetime = get_now_or_fake_time(request) current_courses = [] past_courses = [] for course in Course.objects.filter(listed=True): participation = get_participation_for_request(request, course) show = True if course.hidden: perms = get_participation_permissions(course, participation) if (pperm.view_hidden_course_page, None) not in perms: show = False if show: if (course.end_date is None or now_datetime.date() <= course.end_date): current_courses.append(course) else: past_courses.append(course) def course_sort_key_minor(course): return course.number if course.number is not None else "" def course_sort_key_major(course): return (course.start_date if course.start_date is not None else now_datetime.date()) current_courses.sort(key=course_sort_key_minor) past_courses.sort(key=course_sort_key_minor) current_courses.sort(key=course_sort_key_major, reverse=True) past_courses.sort(key=course_sort_key_major, reverse=True) return render(request, "course/home.html", { "current_courses": current_courses, "past_courses": past_courses, }) # }}} # {{{ pages def check_course_state( course: Course, participation: Participation | None) -> None: """ Check to see if the course is hidden. If hidden, only allow access to 'ta' and 'instructor' roles """ if course.hidden: if participation is None: raise PermissionDenied(_("course page is currently hidden")) if not participation.has_permission(pperm.view_hidden_course_page): raise PermissionDenied(_("course page is currently hidden")) @course_view def course_page(pctx: CoursePageContext) -> http.HttpResponse: from course.content import get_course_desc, get_processed_page_chunks page_desc = get_course_desc(pctx.repo, pctx.course, pctx.course_commit_sha) chunks = get_processed_page_chunks( pctx.course, pctx.repo, pctx.course_commit_sha, page_desc, pctx.role_identifiers(), get_now_or_fake_time(pctx.request), facilities=pctx.request.relate_facilities) show_enroll_button = ( pctx.course.accepts_enrollment and pctx.participation is None) if pctx.request.user.is_authenticated and Participation.objects.filter( user=pctx.request.user, course=pctx.course, status=participation_status.requested).count(): show_enroll_button = False messages.add_message(pctx.request, messages.INFO, _("Your enrollment request is pending. You will be " "notified once it has been acted upon.")) from course.models import ParticipationPreapproval if ParticipationPreapproval.objects.filter( course=pctx.course).exclude(institutional_id=None).count(): if not pctx.request.user.institutional_id: from django.urls import reverse messages.add_message(pctx.request, messages.WARNING, _("This course uses institutional ID for " "enrollment preapproval, please " "fill in your institutional ID »" " in your profile.") % ( reverse("relate-user_profile") + "?referer=" + pctx.request.path + "&set_inst_id=1" ) ) else: if pctx.course.preapproval_require_verified_inst_id: messages.add_message(pctx.request, messages.WARNING, _("Your institutional ID is not verified or " "preapproved. Please contact your course " "staff.") ) return render_course_page(pctx, "course/course-page.html", { "chunks": chunks, "show_enroll_button": show_enroll_button, }) @course_view def static_page(pctx: CoursePageContext, page_path: str) -> http.HttpResponse: from course.content import get_processed_page_chunks, get_staticpage_desc try: page_desc = get_staticpage_desc(pctx.repo, pctx.course, pctx.course_commit_sha, "staticpages/"+page_path+".yml") except ObjectDoesNotExist: raise http.Http404() chunks = get_processed_page_chunks( pctx.course, pctx.repo, pctx.course_commit_sha, page_desc, pctx.role_identifiers(), get_now_or_fake_time(pctx.request), facilities=pctx.request.relate_facilities) return render_course_page(pctx, "course/static-page.html", { "chunks": chunks, "show_enroll_button": False, }) # }}} # {{{ media def media_etag_func(request, course_identifier, commit_sha, media_path): return ":".join([course_identifier, commit_sha, media_path]) @cache_control(max_age=3600*24*31) # cache for a month @http_dec.condition(etag_func=media_etag_func) def get_media(request, course_identifier, commit_sha, media_path): course = get_object_or_404(Course, identifier=course_identifier) with get_course_repo(course) as repo: return get_repo_file_response( repo, "media/" + media_path, commit_sha.encode()) def repo_file_etag_func(request, course_identifier, commit_sha, path): return ":".join([course_identifier, commit_sha, path]) @cache_control(max_age=3600*24*31) # cache for a month @http_dec.condition(etag_func=repo_file_etag_func) def get_repo_file(request, course_identifier, commit_sha, path): commit_sha = commit_sha.encode() course = get_object_or_404(Course, identifier=course_identifier) participation = get_participation_for_request(request, course) return get_repo_file_backend( request, course, participation, commit_sha, path) def current_repo_file_etag_func( request: http.HttpRequest, course_identifier: str, path: str) -> str: course = get_object_or_404(Course, identifier=course_identifier) participation = get_participation_for_request(request, course) check_course_state(course, participation) from course.content import get_course_commit_sha commit_sha = get_course_commit_sha(course, participation) return ":".join([course_identifier, commit_sha.decode(), path]) @http_dec.condition(etag_func=current_repo_file_etag_func) def get_current_repo_file( request: http.HttpRequest, course_identifier: str, path: str ) -> http.HttpResponse: course = get_object_or_404(Course, identifier=course_identifier) participation = get_participation_for_request(request, course) from course.content import get_course_commit_sha commit_sha = get_course_commit_sha(course, participation) return get_repo_file_backend( request, course, participation, commit_sha, path) def get_repo_file_backend( request: http.HttpRequest, course: Course, participation: Participation | None, commit_sha: bytes, path: str, ) -> http.HttpResponse: """ Check if a file should be accessible. Then call for it if the permission is not denied. Order is important here. An in-exam request takes precedence. Note: an access_role of "public" is equal to "unenrolled" """ request = cast(RelateHttpRequest, request) # check to see if the course is hidden check_course_state(course, participation) # set access to public (or unenrolled), student, etc if request.relate_exam_lockdown: access_kinds = ["in_exam"] else: from course.enrollment import get_participation_permissions access_kinds = [ arg for perm, arg in get_participation_permissions(course, participation) if perm == pperm.access_files_for and arg is not None] from course.content import is_repo_file_accessible_as # retrieve local path for the repo for the course with get_course_repo(course) as repo: if not is_repo_file_accessible_as(access_kinds, repo, commit_sha, path): raise PermissionDenied() return get_repo_file_response(repo, path, commit_sha) def get_repo_file_response( repo: Any, path: str, commit_sha: bytes ) -> http.HttpResponse: from course.content import get_repo_blob_data_cached try: data = get_repo_blob_data_cached(repo, path, commit_sha) except ObjectDoesNotExist: raise http.Http404() from mimetypes import guess_type content_type, __ = guess_type(path) if content_type is None: content_type = "application/octet-stream" return http.HttpResponse(data, content_type=content_type) # }}} # {{{ time travel class FakeTimeForm(StyledForm): time = forms.DateTimeField( widget=HTML5DateTimeInput(), label=_("Time")) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.helper.add_input( # Translators: "set" fake time. Submit("set", _("Set"))) self.helper.add_input( # Translators: "unset" fake time. Submit("unset", _("Unset"))) def get_fake_time(request: http.HttpRequest | None) -> datetime.datetime | None: if request is not None and "relate_fake_time" in request.session: from zoneinfo import ZoneInfo from django.conf import settings tz = ZoneInfo(settings.TIME_ZONE) return datetime.datetime.fromtimestamp( request.session["relate_fake_time"], tz=tz) else: return None def get_now_or_fake_time(request: http.HttpRequest | None) -> datetime.datetime: fake_time = get_fake_time(request) if fake_time is None: from django.utils.timezone import now return now() else: return fake_time def may_set_fake_time(user: User | None) -> bool: if user is None: return False return Participation.objects.filter( user=user, roles__permissions__permission=pperm.set_fake_time ).count() > 0 @login_required def set_fake_time(request): # allow staff to set fake time when impersonating pre_imp_user = get_pre_impersonation_user(request) if not ( may_set_fake_time(request.user) or ( pre_imp_user is not None and may_set_fake_time(pre_imp_user))): raise PermissionDenied(_("may not set fake time")) if request.method == "POST": form = FakeTimeForm(request.POST, request.FILES) do_set = "set" in form.data if form.is_valid(): fake_time = form.cleaned_data["time"] if do_set: import time request.session["relate_fake_time"] = \ time.mktime(fake_time.timetuple()) else: request.session.pop("relate_fake_time", None) else: if "relate_fake_time" in request.session: form = FakeTimeForm({ "time": get_fake_time(request) }) else: form = FakeTimeForm() return render(request, "generic-form.html", { "form": form, "form_description": _("Set fake time"), }) def fake_time_context_processor(request): return { "fake_time": get_fake_time(request), } # }}} # {{{ space travel (i.e. pretend to be in facility) class FakeFacilityForm(StyledForm): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) from course.utils import get_facilities_config self.fields["facilities"] = forms.MultipleChoiceField( choices=( (name, name) for name in get_facilities_config()), widget=forms.CheckboxSelectMultiple, required=False, label=_("Facilities"), help_text=_("Facilities you wish to pretend to be in")) self.fields["custom_facilities"] = forms.CharField( label=_("Custom facilities"), required=False, help_text=_("More (non-predefined) facility names, separated " "by commas, which would like to pretend to be in")) self.fields["add_pretend_facilities_header"] = forms.BooleanField( required=False, initial=True, label=_("Add fake facililities header"), help_text=_("Add a page header to every page rendered " "while pretending to be in a facility, as a reminder " "that this pretending is in progress.")) self.helper.add_input( # Translators: "set" fake facility. Submit("set", _("Set"))) self.helper.add_input( # Translators: "unset" fake facility. Submit("unset", _("Unset"))) def may_set_pretend_facility(user: User | None) -> bool: if user is None: return False return Participation.objects.filter( user=user, roles__permissions__permission=pperm.set_pretend_facility ).count() > 0 @login_required def set_pretend_facilities(request): # allow staff to set fake time when impersonating pre_imp_user = get_pre_impersonation_user(request) if not ( may_set_pretend_facility(request.user) or ( pre_imp_user is not None and may_set_pretend_facility(pre_imp_user))): raise PermissionDenied(_("may not pretend facilities")) if request.method == "POST": form = FakeFacilityForm(request.POST) do_set = "set" in form.data if form.is_valid(): if do_set: pretend_facilities = ( form.cleaned_data["facilities"] + [s.strip() for s in ( form.cleaned_data["custom_facilities"].split(",")) if s.strip()]) request.session["relate_pretend_facilities"] = pretend_facilities request.session["relate_pretend_facilities_header"] = \ form.cleaned_data["add_pretend_facilities_header"] else: request.session.pop("relate_pretend_facilities", None) else: if "relate_pretend_facilities" in request.session: form = FakeFacilityForm({ "facilities": [], "custom_facilities": ",".join( request.session["relate_pretend_facilities"]), "add_pretend_facilities_header": request.session["relate_pretend_facilities_header"], }) else: form = FakeFacilityForm() return render(request, "generic-form.html", { "form": form, "form_description": _("Pretend to be in Facilities"), }) def pretend_facilities_context_processor(request): return { "pretend_facilities": request.session.get( "relate_pretend_facilities", []), "add_pretend_facilities_header": request.session.get("relate_pretend_facilities_header", True), } # }}} # {{{ instant flow requests class InstantFlowRequestForm(StyledForm): def __init__(self, flow_ids, *args, **kwargs): super().__init__(*args, **kwargs) self.fields["flow_id"] = forms.ChoiceField( choices=[(fid, fid) for fid in flow_ids], required=True, label=_("Flow ID"), widget=Select2Widget()) self.fields["duration_in_minutes"] = forms.IntegerField( required=True, initial=20, label=pgettext_lazy("Duration for instant flow", "Duration in minutes")) self.helper.add_input( Submit( "add", pgettext("Add an instant flow", "Add"))) self.helper.add_input( Submit( "cancel", pgettext("Cancel all instant flow(s)", "Cancel all"))) @course_view def manage_instant_flow_requests(pctx): if not pctx.has_permission(pperm.manage_instant_flow_requests): raise PermissionDenied() from course.content import list_flow_ids flow_ids = list_flow_ids(pctx.repo, pctx.course_commit_sha) request = pctx.request if request.method == "POST": form = InstantFlowRequestForm(flow_ids, request.POST, request.FILES) if "add" in request.POST: op = "add" elif "cancel" in request.POST: op = "cancel" else: raise SuspiciousOperation(_("invalid operation")) now_datetime = get_now_or_fake_time(pctx.request) if form.is_valid(): if op == "add": from datetime import timedelta ifr = InstantFlowRequest() ifr.course = pctx.course ifr.flow_id = form.cleaned_data["flow_id"] ifr.start_time = now_datetime ifr.end_time = ( now_datetime + timedelta( minutes=form.cleaned_data["duration_in_minutes"])) ifr.save() else: assert op == "cancel" (InstantFlowRequest.objects .filter( course=pctx.course, start_time__lte=now_datetime, end_time__gte=now_datetime, cancelled=False) .order_by("start_time") .update(cancelled=True)) else: form = InstantFlowRequestForm(flow_ids) return render_course_page(pctx, "course/generic-course-form.html", { "form": form, "form_description": _("Manage Instant Flow Requests"), }) # }}} # {{{ test flow class FlowTestForm(StyledForm): def __init__(self, flow_ids, *args, **kwargs): super().__init__(*args, **kwargs) self.fields["flow_id"] = forms.ChoiceField( choices=[(fid, fid) for fid in flow_ids], required=True, label=_("Flow ID"), widget=Select2Widget()) self.helper.add_input( Submit( "test", mark_safe( string_concat( pgettext("Start an activity", "Go"), " »")), )) @course_view def test_flow(pctx): if not pctx.has_permission(pperm.test_flow): raise PermissionDenied() from course.content import list_flow_ids flow_ids = list_flow_ids(pctx.repo, pctx.course_commit_sha) request = pctx.request if request.method == "POST": form = FlowTestForm(flow_ids, request.POST, request.FILES) if "test" not in request.POST: raise SuspiciousOperation(_("invalid operation")) if form.is_valid(): return redirect("relate-view_start_flow", pctx.course.identifier, form.cleaned_data["flow_id"]) else: form = FlowTestForm(flow_ids) return render_course_page(pctx, "course/generic-course-form.html", { "form": form, "form_description": _("Test Flow"), }) # }}} # {{{ flow access exceptions class ParticipationChoiceField(forms.ModelChoiceField): def label_from_instance(self, obj): user = obj.user return ( f"{user.email} - {user.get_full_name()}") class ExceptionStage1Form(StyledForm): def __init__(self, course, flow_ids, *args, **kwargs): super().__init__(*args, **kwargs) self.fields["participation"] = ParticipationChoiceField( queryset=(Participation.objects .filter( course=course, status=participation_status.active, ) .order_by("user__last_name")), required=True, help_text=_("Select participant for whom exception is to " "be granted."), label=_("Participant"), widget=Select2Widget()) self.fields["flow_id"] = forms.ChoiceField( choices=[(fid, fid) for fid in flow_ids], required=True, label=_("Flow ID")) self.helper.add_input( Submit( "next", mark_safe( string_concat( pgettext("Next step", "Next"), " »")))) @course_view def grant_exception(pctx): if not pctx.has_permission(pperm.grant_exception): raise PermissionDenied(_("may not grant exceptions")) from course.content import list_flow_ids flow_ids = list_flow_ids(pctx.repo, pctx.course_commit_sha) request = pctx.request if request.method == "POST": form = ExceptionStage1Form(pctx.course, flow_ids, request.POST) if form.is_valid(): return redirect("relate-grant_exception_stage_2", pctx.course.identifier, form.cleaned_data["participation"].id, form.cleaned_data["flow_id"]) else: form = ExceptionStage1Form(pctx.course, flow_ids) return render_course_page(pctx, "course/generic-course-form.html", { "form": form, "form_description": _("Grant Exception"), }) def strify_session_for_exception(session: FlowSession) -> str: from relate.utils import as_local_time, format_datetime_local # Translators: %s is the string of the start time of a session. result = (_("started at %s") % format_datetime_local( as_local_time(session.start_time))) if session.access_rules_tag: result += _(" tagged '%s'") % session.access_rules_tag return result class CreateSessionForm(StyledForm): def __init__( self, session_tag_choices: list[tuple[str, str]], default_tag: str | None, create_session_is_override: bool, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.fields["access_rules_tag_for_new_session"] = forms.ChoiceField( choices=session_tag_choices, initial=default_tag, help_text=_("If you click 'Create session', this tag will be " "applied to the new session."), label=_("Access rules tag for new session")) if create_session_is_override: self.helper.add_input( Submit( "create_session", _("Create session (override rules)"))) else: self.helper.add_input( Submit( "create_session", _("Create session"))) class ExceptionStage2Form(StyledForm): def __init__( self, sessions: list[FlowSession], *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.fields["session"] = forms.ChoiceField( choices=( (session.id, strify_session_for_exception(session)) for session in sessions), help_text=_("The rules that currently apply to selected " "session will provide the default values for the rules " "on the next page."), label=_("Session")) self.helper.add_input( Submit( "next", mark_safe( string_concat( pgettext("Next step", "Next"), " »")))) @course_view def grant_exception_stage_2( pctx: CoursePageContext, participation_id: str, flow_id: str ) -> http.HttpResponse: if not pctx.has_permission(pperm.grant_exception): raise PermissionDenied(_("may not grant exceptions")) # {{{ get flow data participation = get_object_or_404(Participation, id=participation_id) form_text = ( string_concat( "