Newer
Older
self.fields["create_access_exception"] = forms.BooleanField(
required=False, help_text=_("If set, an exception for the "
"access rules will be created."), initial=True,
label=_("Create access rule exception"))
self.fields["access_expires"] = forms.DateTimeField(
widget=HTML5DateTimeInput(),
required=False,
label=pgettext_lazy("Time when access expires", "Access expires"),
help_text=_("At the specified time, the special access granted below "
"will expire "
"and revert to being the same as for the rest of the class. "
"This field may "
"be empty, in which case this access does not expire. Note also that "
"the grading-related entries (such as 'due date' and 'credit percent') "
"do not expire and remain valid indefinitely, unless overridden by "
"another exception."))
for key, name in FLOW_PERMISSION_CHOICES:
self.fields[key] = forms.BooleanField(label=name, required=False,
initial=default_data.get(key) or False)
access_fields.append(key)
layout.append(Div(*access_fields, css_class="relate-well"))
self.fields["create_grading_exception"] = forms.BooleanField(
required=False, help_text=_("If set, an exception for the "
"grading rules will be created."), initial=True,
label=_("Create grading rule exception"))
self.fields["due_same_as_access_expiration"] = forms.BooleanField(
required=False, help_text=_("If set, the 'Due time' field will be "
"disregarded."),
initial=default_data.get("due_same_as_access_expiration") or False,
label=_("Due same as access expiration"))
self.fields["due"] = forms.DateTimeField(
widget=HTML5DateTimeInput(),
help_text=_("The due time shown to the student. Also, the "
"any session under these rules is subject to expiration."),
initial=default_data.get("due"),
label=_("Due time"))
self.fields["generates_grade"] = forms.BooleanField(required=False,
initial=default_data.get("generates_grade", True),
label=_("Generates grade"))
self.fields["credit_percent"] = forms.FloatField(required=False,
initial=default_data.get("credit_percent"),
label=_("Credit percent"))
self.fields["bonus_points"] = forms.FloatField(required=False,
initial=default_data.get("bonus_points"),
label=_("Bonus points"))
self.fields["max_points"] = forms.FloatField(required=False,
initial=default_data.get("max_points"),
label=_("Maximum number of points (for percentage)"))
self.fields["max_points_enforced_cap"] = forms.FloatField(required=False,
initial=default_data.get("max_points_enforced_cap"),
label=_("Maximum number of points (enforced cap)"))
layout.append(Div("create_grading_exception",
"due_same_as_access_expiration", "due",
"generates_grade",
"credit_percent", "bonus_points", "max_points",
"max_points_enforced_cap",
self.fields["comment"] = forms.CharField(
widget=forms.Textarea, required=True,
initial=default_data.get("comment"),
label=_("Comment"))
layout.append("comment")
self.helper.add_input(
Submit(
self.helper.layout = Layout(*layout)
access_expires = self.cleaned_data.get("access_expires")
due_same_as_access_expiration = self.cleaned_data.get(
"due_same_as_access_expiration")
if (not access_expires and due_same_as_access_expiration):
self.add_error(
"access_expires",
_("Must specify access expiration if 'due same "
"as access expiration' is set."))
@course_view
@transaction.atomic
def grant_exception_stage_3(
pctx: CoursePageContext,
participation_id: int,
flow_id: str,
session_id: int) -> http.HttpResponse:
Andreas Klöckner
committed
if not pctx.has_permission(pperm.grant_exception):
raise PermissionDenied(_("may not grant exceptions"))
assert pctx.request.user.is_authenticated
participation = get_object_or_404(Participation, id=participation_id)
from course.content import get_flow_desc
try:
flow_desc = get_flow_desc(pctx.repo, pctx.course, flow_id,
pctx.course_commit_sha)
except ObjectDoesNotExist:
raise http.Http404()
session = FlowSession.objects.get(id=int(session_id))
now_datetime = get_now_or_fake_time(pctx.request)
from course.utils import get_session_access_rule, get_session_grading_rule
access_rule = get_session_access_rule(session, flow_desc, now_datetime)
grading_rule = get_session_grading_rule(session, flow_desc, now_datetime)
request = pctx.request
if request.method == "POST":
form = ExceptionStage3Form(
{}, flow_desc, session.access_rules_tag, request.POST)
permissions = [
key
for key, __ in FLOW_PERMISSION_CHOICES
if form.cleaned_data[key]]
ValidationContext,
validate_session_access_rule,
from relate.utils import dict_to_struct
vctx = ValidationContext(
repo=pctx.repo,
commit_sha=pctx.course_commit_sha)
flow_desc = get_flow_desc(pctx.repo,
pctx.course,
flow_id, pctx.course_commit_sha)
tags: list[str] = []
tags = cast(list[str], getattr(flow_desc.rules, "tags", []))
exceptions_created = []
restricted_to_same_tag = bool(
form.cleaned_data.get("restrict_to_same_tag")
and session.access_rules_tag is not None)
if form.cleaned_data["create_access_exception"]:
new_access_rule: dict[str, Any] = {"permissions": permissions}
if restricted_to_same_tag:
new_access_rule["if_has_tag"] = session.access_rules_tag
validate_session_access_rule(
vctx, _("newly created exception"),
dict_to_struct(new_access_rule), tags)
fre_access = FlowRuleException(
flow_id=flow_id,
participation=participation,
expiration=form.cleaned_data["access_expires"],
creator=pctx.request.user,
comment=form.cleaned_data["comment"],
kind=flow_rule_kind.access,
rule=new_access_rule)
fre_access.save()
exceptions_created.append(
dict(FLOW_RULE_KIND_CHOICES)[fre_access.kind])
session_access_rules_tag_changed = False
if not restricted_to_same_tag:
new_access_rules_tag = form.cleaned_data.get("set_access_rules_tag")
if new_access_rules_tag == NONE_SESSION_TAG:
new_access_rules_tag = None
if session.access_rules_tag != new_access_rules_tag:
session.access_rules_tag = new_access_rules_tag
session.save()
session_access_rules_tag_changed = True
if new_access_rules_tag is not None:
msg = _("Access rules tag of the selected session "
"updated to '%s'.") % new_access_rules_tag
else:
msg = _(
"Removed access rules tag of the selected session.")
messages.add_message(pctx.request, messages.SUCCESS, msg)
if form.cleaned_data["create_grading_exception"]:
due = form.cleaned_data["due"]
if form.cleaned_data["due_same_as_access_expiration"]:
due = form.cleaned_data["access_expires"]
if form.cleaned_data["credit_percent"] is not None:
descr += string_concat(" (%.1f%% ", _("credit"), ")") \
% form.cleaned_data["credit_percent"]
due_local_naive = due
if due_local_naive is not None:
from relate.utils import as_local_time
due_local_naive = (
as_local_time(due_local_naive)
.replace(tzinfo=None))
new_grading_rule: dict[str, Any] = {"description": descr}
if due_local_naive is not None:
new_grading_rule["due"] = due_local_naive
for attr_name in ["credit_percent", "bonus_points",
"max_points", "max_points_enforced_cap", "generates_grade"]:
if form.cleaned_data[attr_name] is not None:
new_grading_rule[attr_name] = form.cleaned_data[attr_name]
if restricted_to_same_tag:
new_grading_rule["if_has_tag"] = session.access_rules_tag
vctx, _("newly created exception"),
dict_to_struct(new_grading_rule), tags,
grading_rule.grade_identifier)
fre_grading = FlowRuleException(
flow_id=flow_id,
participation=participation,
creator=pctx.request.user,
comment=form.cleaned_data["comment"],
kind=flow_rule_kind.grading,
rule=new_grading_rule)
fre_grading.save()
exceptions_created.append(
dict(FLOW_RULE_KIND_CHOICES)[fre_grading.kind])
if exceptions_created:
for exc in exceptions_created:
messages.add_message(pctx.request, messages.SUCCESS,
_(
"'%(exception_type)s' exception granted to "
"'%(participation)s' for '%(flow_id)s'.")
% {
"exception_type": exc,
"participation": participation,
"flow_id": flow_id})
else:
if session_access_rules_tag_changed:
messages.add_message(
pctx.request, messages.WARNING,
_(
"No other exception granted to the given flow "
"session of '%(participation)s' "
"for '%(flow_id)s'.")
% {
"participation": participation,
"flow_id": flow_id})
else:
messages.add_message(pctx.request, messages.WARNING,
_(
"No exception granted to the given flow "
"session of '%(participation)s' "
"for '%(flow_id)s'.")
% {
"participation": participation,
"flow_id": flow_id})
"relate-grant_exception",
pctx.course.identifier)
else:
data = {
"restrict_to_same_tag": session.access_rules_tag is not None,
# "due_same_as_access_expiration": True,
"generates_grade": grading_rule.generates_grade,
"credit_percent": grading_rule.credit_percent,
"bonus_points": grading_rule.bonus_points,
"max_points": grading_rule.max_points,
"max_points_enforced_cap": grading_rule.max_points_enforced_cap,
for perm in access_rule.permissions:
form = ExceptionStage3Form(data, flow_desc, session.access_rules_tag)
return render_course_page(pctx, "course/generic-course-form.html", {
"form": form,
"form_description": _("Grant Exception"),
"form_text": string_concat(
_("Granting exception to '%(participation)s' "
"for '%(flow_id)s' (session %(session)s)."),
"</div>")
% {
"participation": participation,
"flow_id": flow_id,
"session": strify_session_for_exception(session)},
# {{{ ssh keypair
@login_required
def generate_ssh_keypair(request):
if not request.user.is_staff:
raise PermissionDenied(_("only staff may use this tool"))
from paramiko import RSAKey
key_class = RSAKey
prv = key_class.generate(bits=2048)
import io
prv_bio = io.StringIO()
prv_bio_read = io.StringIO(prv_bio.getvalue())
pub = key_class.from_private_key(prv_bio_read)
pub_bio.write(f"{pub.get_name()} {pub.get_base64()} relate-course-key")
return render(request, "course/keypair.html", {
"public_key": prv_bio.getvalue(),
"private_key": pub_bio.getvalue(),
# {{{ celery task monitoring
def monitor_task(request, task_id):
from celery import states
async_res = AsyncResult(task_id)
progress_percent = None
progress_statement = None
if async_res.state == "PROGRESS":
meta = async_res.info
current = meta["current"]
total = meta["total"]
if total > 0:
progress_percent = 100 * (current / total)
progress_statement = (
_("%(current)d out of %(total)d items processed.")
% {"current": current, "total": total})
if async_res.state == states.SUCCESS:
if (isinstance(async_res.result, dict)
and "message" in async_res.result):
progress_statement = async_res.result["message"]
traceback = None
if request.user.is_staff and async_res.state == states.FAILURE:
traceback = async_res.traceback
return render(request, "course/task-monitor.html", {
"state": async_res.state,
"progress_percent": progress_percent,
"progress_statement": progress_statement,
"traceback": traceback,
})
# }}}
# {{{ edit course
class EditCourseForm(StyledModelForm):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["identifier"].disabled = True
self.fields["active_git_commit_sha"].disabled = True
self.helper.add_input(
Submit("submit", _("Update")))
class Meta:
model = Course
exclude = (
"participants",
"trusted_for_markup",
"start_date": HTML5DateInput(),
"end_date": HTML5DateInput(),
"force_lang": forms.Select(
choices=get_course_specific_language_choices()),
}
@course_view
def edit_course(pctx):
if not pctx.has_permission(pperm.edit_course):
raise PermissionDenied()
request = pctx.request
if request.method == "POST":
form = EditCourseForm(request.POST, instance=pctx.course)
if form.is_valid():
messages.add_message(
request, messages.SUCCESS,
_("Successfully updated course settings."))
else:
messages.add_message(
request, messages.INFO,
_("No change was made on the settings."))
else:
messages.add_message(request, messages.ERROR,
_("Failed to update course settings."))
form = EditCourseForm(instance=instance)
# Render the page with course.force_lang, in case force_lang was updated
from course.utils import LanguageOverride
with LanguageOverride(instance):
return render_course_page(pctx, "course/generic-course-form.html", {
"form_description": _("Edit Course"),
"form": form
})