Newer
Older
"Grading opportunity"))
self.fields["attempt_id"] = forms.CharField(
initial="main",
required=True,
label=_("Attempt ID"))
self.fields["file"] = forms.FileField(
label=_("File"))
self.fields["format"] = forms.ChoiceField(
choices=(
self.fields["attr_type"] = forms.ChoiceField(
choices=(
("email_or_id", _("Email or NetID")),
("inst_id", _("Institutional ID")),
),
label=_("User attribute"))
self.fields["attr_column"] = forms.IntegerField(
# Translators: the following strings are for the format
# informatioin for a CSV file to be imported.
help_text=_("1-based column index for the user attribute "
"selected above to locate student record"),
label=_("User attribute column"))
self.fields["points_column"] = forms.IntegerField(
help_text=_("1-based column index for the (numerical) grade"),
self.fields["feedback_column"] = forms.IntegerField(
help_text=_("1-based column index for further (textual) feedback"),
min_value=1, required=False,
label=_("Feedback column"))
self.fields["max_points"] = forms.DecimalField(
initial=100,
# Translators: "Max point" refers to full credit in points.
label=_("Max points"))
self.helper.add_input(Submit("preview", _("Preview")))
self.helper.add_input(Submit("import", _("Import")))
def clean(self):
data = super(ImportGradesForm, self).clean()
if file_contents:
data["attr_column"],
data["points_column"],
data["feedback_column"]
]
header_count = 1 if has_header else 0
from course.utils import csv_data_importable
importable, err_msg = csv_data_importable(
file_contents,
column_idx_list,
header_count)
if not importable:
self.add_error('file', err_msg)
class ParticipantNotFound(ValueError):
pass
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
def find_participant_from_inst_id(course, inst_id_str):
inst_id_str = inst_id_str.strip()
matches = (Participation.objects
.filter(
course=course,
status=participation_status.active,
user__institutional_id__exact=inst_id_str)
.select_related("user"))
if not matches:
raise ParticipantNotFound(
# Translators: use institutional_id_string to find user
# (participant).
_("no participant found with institutional ID "
"'%(inst_id_string)s'") % {
"inst_id_string": inst_id_str})
if len(matches) > 1:
raise ParticipantNotFound(
_("more than one participant found with institutional ID "
"'%(inst_id_string)s'") % {
"inst_id_string": inst_id_str})
return matches[0]
def find_participant_from_id(course, id_str):
matches = (Participation.objects
.filter(
course=course,
.select_related("user"))
surviving_matches = []
for match in matches:
surviving_matches.append(match)
continue
at_index = email.index("@")
assert at_index > 0
uid = email[:at_index]
if uid == id_str:
surviving_matches.append(match)
continue
if not surviving_matches:
raise ParticipantNotFound(
# Translators: use id_string to find user (participant).
_("no participant found for '%(id_string)s'") % {
"id_string": id_str})
raise ParticipantNotFound(
_("more than one participant found for '%(id_string)s'") % {
"id_string": id_str})
def fix_decimal(s):
if "," in s and "." not in s:
comma_count = len([c for c in s if c == ","])
if comma_count == 1:
return s.replace(",", ".")
else:
return s
else:
return s
def csv_to_grade_changes(
log_lines,
course, grading_opportunity, attempt_id, file_contents,
attr_type, attr_column, points_column, feedback_column,
max_points, creator, grade_time, has_header):
spamreader = csv.reader(file_contents)
for row in spamreader:
if has_header:
has_header = False
continue
gchange = GradeChange()
gchange.opportunity = grading_opportunity
if attr_type == "email_or_id":
gchange.participation = find_participant_from_id(
course, row[attr_column-1])
elif attr_type == "inst_id":
gchange.participation = find_participant_from_inst_id(
course, row[attr_column-1])
else:
raise ParticipantNotFound(
_("Unknown user attribute '%(attr_type)s'") % {
"attr_type": attr_type})
except ParticipantNotFound as e:
log_lines.append(e)
gchange.state = grade_state_change_types.graded
gchange.attempt_id = attempt_id
points_str = row[points_column-1].strip()
# Moodle's "NULL" grades look like this.
if points_str in ["-", ""]:
gchange.points = None
else:
gchange.points = float(fix_decimal(points_str))
gchange.max_points = max_points
if feedback_column is not None:
gchange.comment = row[feedback_column-1]
gchange.creator = creator
gchange.grade_time = grade_time
last_grades = (GradeChange.objects
.filter(
opportunity=grading_opportunity,
participation=gchange.participation,
attempt_id=gchange.attempt_id)
.order_by("-grade_time")[:1])
if last_grades.count():
last_grade, = last_grades
if last_grade.state == grade_state_change_types.graded:
updated = []
if last_grade.points != gchange.points:
if last_grade.max_points != gchange.max_points:
if last_grade.comment != gchange.comment:
string_concat(
"%(participation)s: %(updated)s ",
_("updated")
) % {
'participation': gchange.participation,
'updated': ", ".join(updated)})
result.append(gchange)
else:
@course_view
@transaction.atomic
def import_grades(pctx):
Andreas Klöckner
committed
if not pctx.has_permission(pperm.batch_import_grade):
raise PermissionDenied(_("may not batch-import grades"))
log_lines = []
request = pctx.request
if request.method == "POST":
form = ImportGradesForm(
pctx.course, request.POST, request.FILES)
is_import = "import" in request.POST
if form.is_valid():
try:
total_count, grade_changes = csv_to_grade_changes(
course=pctx.course,
grading_opportunity=form.cleaned_data["grading_opportunity"],
attempt_id=form.cleaned_data["attempt_id"],
file_contents=request.FILES["file"],
attr_type=form.cleaned_data["attr_type"],
attr_column=form.cleaned_data["attr_column"],
points_column=form.cleaned_data["points_column"],
feedback_column=form.cleaned_data["feedback_column"],
max_points=form.cleaned_data["max_points"],
creator=request.user,
grade_time=now(),
has_header=form.cleaned_data["format"] == "csvhead")
except Exception as e:
messages.add_message(pctx.request, messages.ERROR,
string_concat(
pgettext_lazy("Starting of Error message",
"Error"),
": %(err_type)s %(err_str)s")
% {
"err_type": type(e).__name__,
"err_str": str(e)})
if total_count != len(grade_changes):
messages.add_message(pctx.request, messages.INFO,
_("%(total)d grades found, %(unchaged)d unchanged.")
% {'total': total_count,
'unchaged': total_count - len(grade_changes)})
from django.template.loader import render_to_string
if is_import:
GradeChange.objects.bulk_create(grade_changes)
form_text = render_to_string(
"course/grade-import-preview.html", {
"show_grade_changes": False,
"log_lines": log_lines,
})
messages.add_message(pctx.request, messages.SUCCESS,
else:
form_text = render_to_string(
"course/grade-import-preview.html", {
"show_grade_changes": True,
"log_lines": log_lines,
})
else:
form = ImportGradesForm(pctx.course)
return render_course_page(pctx, "course/generic-course-form.html", {
"form": form,
"form_text": form_text,
})
# }}}
# {{{ download all submissions
class DownloadAllSubmissionsForm(StyledForm):
def __init__(self, page_ids, session_tag_choices, *args, **kwargs):
super(DownloadAllSubmissionsForm, self).__init__(*args, **kwargs)
self.fields["page_id"] = forms.ChoiceField(
choices=tuple(
(pid, pid)
for pid in page_ids),
label=_("Page ID"))
self.fields["which_attempt"] = forms.ChoiceField(
choices=(
("first", _("Least recent attempt")),
("last", _("Most recent attempt")),
label=_("Attempts to include."),
help_text=_(
"Every submission to the page counts as an attempt."),
initial="last")
self.fields["restrict_to_rules_tag"] = forms.ChoiceField(
choices=session_tag_choices,
help_text=_("Only download sessions tagged with this rules tag."),
label=_("Restrict to rules tag"))
self.fields["non_in_progress_only"] = forms.BooleanField(
required=False,
initial=True,
help_text=_("Only download submissions from non-in-progress "
"sessions"),
label=_("Non-in-progress only"))
self.fields["include_feedback"] = forms.BooleanField(
required=False,
initial=True,
help_text=_("Include provided feedback as text file in zip"),
label=_("Include feedback"))
self.fields["extra_file"] = forms.FileField(
label=_("Additional File"),
help_text=_(
"If given, the uploaded file will be included "
"in the zip archive. "
"If the produced archive is to be used for plagiarism "
"detection, then this may be used to include the reference "
"solution."),
required=False)
self.helper.add_input(
Submit("download", _("Download")))
@course_view
def download_all_submissions(pctx, flow_id):
Andreas Klöckner
committed
if not pctx.has_permission(pperm.batch_download_submission):
raise PermissionDenied(_("may not batch-download submissions"))
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
from course.content import get_flow_desc
flow_desc = get_flow_desc(pctx.repo, pctx.course, flow_id,
pctx.course_commit_sha)
# {{{ find access rules tag
if hasattr(flow_desc, "rules"):
access_rules_tags = getattr(flow_desc.rules, "tags", [])
else:
access_rules_tags = []
ALL_SESSION_TAG = string_concat("<<<", _("ALL"), ">>>") # noqa
session_tag_choices = [
(tag, tag)
for tag in access_rules_tags] + [(ALL_SESSION_TAG,
string_concat("(", _("ALL"), ")"))]
# }}}
page_ids = [
"%s/%s" % (group_desc.id, page_desc.id)
for group_desc in flow_desc.groups
for page_desc in group_desc.pages]
from course.page.base import AnswerFeedback
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
request = pctx.request
if request.method == "POST":
form = DownloadAllSubmissionsForm(page_ids, session_tag_choices,
request.POST)
if form.is_valid():
which_attempt = form.cleaned_data["which_attempt"]
slash_index = form.cleaned_data["page_id"].index("/")
group_id = form.cleaned_data["page_id"][:slash_index]
page_id = form.cleaned_data["page_id"][slash_index+1:]
from course.utils import PageInstanceCache
page_cache = PageInstanceCache(pctx.repo, pctx.course, flow_id)
visits = (FlowPageVisit.objects
.filter(
flow_session__course=pctx.course,
flow_session__flow_id=flow_id,
page_data__group_id=group_id,
page_data__page_id=page_id,
is_submitted_answer=True,
)
.select_related("flow_session")
.select_related("flow_session__participation__user")
.select_related("page_data")
# We overwrite earlier submissions with later ones
# in a dictionary below.
.order_by("visit_time"))
if form.cleaned_data["non_in_progress_only"]:
visits = visits.filter(flow_session__in_progress=False)
if form.cleaned_data["restrict_to_rules_tag"] != ALL_SESSION_TAG:
visits = (visits
.filter(
flow_session__access_rules_tag=(
form.cleaned_data["restrict_to_rules_tag"]))
.select_related("grades"))
submissions = {}
for visit in visits:
page = page_cache.get_page(group_id, page_id,
pctx.course_commit_sha)
from course.page import PageContext
grading_page_context = PageContext(
course=pctx.course,
repo=pctx.repo,
commit_sha=pctx.course_commit_sha,
flow_session=visit.flow_session)
bytes_answer = page.normalized_bytes_answer(
grading_page_context, visit.page_data.data,
visit.answer)
if which_attempt in ["first", "last"]:
key = (visit.flow_session.participation.user.username,)
elif which_attempt == "all":
key = (visit.flow_session.participation.user.username,
str(visit.flow_session.id))
else:
raise NotImplementedError()
if bytes_answer is not None:
if (which_attempt == "first"
and key in submissions):
# Already there, disregard further ones
continue
submissions[key] = (
bytes_answer, list(visit.grades.all()))
from six import BytesIO
from zipfile import ZipFile
bio = BytesIO()
with ZipFile(bio, "w") as subm_zip:
for key, ((extension, bytes_answer), visit_grades) in \
basename = "-".join(key)
basename + extension,
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
if form.cleaned_data["include_feedback"]:
feedback_lines = []
feedback_lines.append(
"scores: %s" % (
", ".join(
str(g.correctness)
for g in visit_grades)))
for i, grade in enumerate(visit_grades):
feedback_lines.append(75*"-")
feedback_lines.append(
"grade %i: score: %s" % (i+1, grade.correctness))
feedback_lines.append(
AnswerFeedback.from_json(
grade.feedback, None).feedback)
subm_zip.writestr(
basename + "-feedback.txt",
"\n".join(feedback_lines))
extra_file = request.FILES.get("extra_file")
if extra_file is not None:
subm_zip.writestr(extra_file.name, extra_file.read())
response = http.HttpResponse(
bio.getvalue(),
content_type="application/zip")
response['Content-Disposition'] = (
'attachment; filename="submissions_%s_%s_%s_%s_%s.zip"'
% (pctx.course.identifier, flow_id, group_id, page_id,
now().date().strftime("%Y-%m-%d")))
return response
else:
form = DownloadAllSubmissionsForm(page_ids, session_tag_choices)
return render_course_page(pctx, "course/generic-course-form.html", {
"form": form,
"form_description": _("Download All Submissions in Zip file")
})
# }}}
Andreas Klöckner
committed
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
# {{{ edit_grading_opportunity
class EditGradingOpportunityForm(StyledModelForm):
def __init__(self, *args, **kwargs):
# type: (*Any, **Any) -> None
super(EditGradingOpportunityForm, self).__init__(*args, **kwargs)
self.fields["identifier"].disabled = True
self.fields["flow_id"].disabled = True
self.fields["creation_time"].disabled = True
self.helper.add_input(
Submit("submit", _("Update")))
class Meta:
model = GradingOpportunity
exclude = (
"course",
# not used
"due_time",
)
widgets = {
"hide_superseded_grade_history_before":
DateTimePicker(
options={"format": "YYYY-MM-DD HH:mm", "sideBySide": True}),
}
@course_view
def edit_grading_opportunity(pctx, opportunity_id):
# type: (CoursePageContext, int) -> http.HttpResponse
if not pctx.has_permission(pperm.edit_grading_opportunity):
raise PermissionDenied()
request = pctx.request
gopp = get_object_or_404(GradingOpportunity, id=int(opportunity_id))
if gopp.course.id != pctx.course.id:
raise SuspiciousOperation(
"may not edit grading opportunity in different course")
if request.method == 'POST':
form = EditGradingOpportunityForm(request.POST, instance=gopp)
if form.is_valid():
form.save()
else:
form = EditGradingOpportunityForm(instance=gopp)
return render_course_page(pctx, "course/generic-course-form.html", {
"form_description": _("Edit Grading Opportunity"),
"form": form
})
# }}}