Newer
Older
# {{{ import grades
class ImportGradesForm(StyledForm):
def __init__(self, course, *args, **kwargs):
super(ImportGradesForm, self).__init__(*args, **kwargs)
self.fields["grading_opportunity"] = forms.ModelChoiceField(
queryset=(GradingOpportunity.objects
.filter(course=course)
.order_by("identifier")),
help_text=_("Click to <a href='%s' target='_blank'>create</a> "
"a new grading opportunity. Reload this form when done.")
% reverse("admin:course_gradingopportunity_add"),
label=pgettext_lazy("Field name in Import grades form",
"Grading opportunity"))
self.fields["attempt_id"] = forms.CharField(
initial="main",
required=True,
label=_("Attempt ID"))
self.fields["file"] = forms.FileField(
label=_("File"))
self.fields["format"] = forms.ChoiceField(
choices=(
self.fields["attr_type"] = forms.ChoiceField(
choices=(
("email_or_id", _("Email or NetID")),
("inst_id", _("Institutional ID")),
),
label=_("User attribute"))
self.fields["attr_column"] = forms.IntegerField(
# Translators: the following strings are for the format
# informatioin for a CSV file to be imported.
help_text=_("1-based column index for the user attribute "
"selected above to locate student record"),
label=_("User attribute column"))
self.fields["points_column"] = forms.IntegerField(
help_text=_("1-based column index for the (numerical) grade"),
self.fields["feedback_column"] = forms.IntegerField(
help_text=_("1-based column index for further (textual) feedback"),
min_value=1, required=False,
label=_("Feedback column"))
self.fields["max_points"] = forms.DecimalField(
initial=100,
# Translators: "Max point" refers to full credit in points.
label=_("Max points"))
self.helper.add_input(Submit("preview", _("Preview")))
self.helper.add_input(Submit("import", _("Import")))
def clean(self):
data = super(ImportGradesForm, self).clean()
if file_contents:
data["attr_column"],
data["points_column"],
data["feedback_column"]
]
header_count = 1 if has_header else 0
from course.utils import csv_data_importable
importable, err_msg = csv_data_importable(
six.StringIO(
file_contents.read().decode("utf-8", errors="replace")),
column_idx_list,
header_count)
if not importable:
self.add_error('file', err_msg)
class ParticipantNotFound(ValueError):
pass
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
def find_participant_from_inst_id(course, inst_id_str):
inst_id_str = inst_id_str.strip()
matches = (Participation.objects
.filter(
course=course,
status=participation_status.active,
user__institutional_id__exact=inst_id_str)
.select_related("user"))
if not matches:
raise ParticipantNotFound(
# Translators: use institutional_id_string to find user
# (participant).
_("no participant found with institutional ID "
"'%(inst_id_string)s'") % {
"inst_id_string": inst_id_str})
if len(matches) > 1:
raise ParticipantNotFound(
_("more than one participant found with institutional ID "
"'%(inst_id_string)s'") % {
"inst_id_string": inst_id_str})
return matches[0]
def find_participant_from_id(course, id_str):
matches = (Participation.objects
.filter(
course=course,
.select_related("user"))
surviving_matches = []
for match in matches:
surviving_matches.append(match)
continue
at_index = email.index("@")
assert at_index > 0
uid = email[:at_index]
if uid == id_str:
surviving_matches.append(match)
continue
if not surviving_matches:
raise ParticipantNotFound(
# Translators: use id_string to find user (participant).
_("no participant found for '%(id_string)s'") % {
"id_string": id_str})
raise ParticipantNotFound(
_("more than one participant found for '%(id_string)s'") % {
"id_string": id_str})
def fix_decimal(s):
if "," in s and "." not in s:
comma_count = len([c for c in s if c == ","])
if comma_count == 1:
return s.replace(",", ".")
else:
return s
else:
return s
def csv_to_grade_changes(
log_lines,
course, grading_opportunity, attempt_id, file_contents,
attr_type, attr_column, points_column, feedback_column,
max_points, creator, grade_time, has_header):
spamreader = csv.reader(file_contents)
for row in spamreader:
if has_header:
has_header = False
continue
gchange = GradeChange()
gchange.opportunity = grading_opportunity
if attr_type == "email_or_id":
gchange.participation = find_participant_from_id(
course, row[attr_column-1])
elif attr_type == "inst_id":
gchange.participation = find_participant_from_inst_id(
course, row[attr_column-1])
else:
raise ParticipantNotFound(
_("Unknown user attribute '%(attr_type)s'") % {
"attr_type": attr_type})
except ParticipantNotFound as e:
log_lines.append(e)
gchange.state = grade_state_change_types.graded
gchange.attempt_id = attempt_id
points_str = row[points_column-1].strip()
# Moodle's "NULL" grades look like this.
if points_str in ["-", ""]:
gchange.points = None
else:
gchange.points = float(fix_decimal(points_str))
gchange.max_points = max_points
if feedback_column is not None:
gchange.comment = row[feedback_column-1]
gchange.creator = creator
gchange.grade_time = grade_time
last_grades = (GradeChange.objects
.filter(
opportunity=grading_opportunity,
participation=gchange.participation,
attempt_id=gchange.attempt_id)
.order_by("-grade_time")[:1])
if last_grades.count():
last_grade, = last_grades
if last_grade.state == grade_state_change_types.graded:
updated = []
if last_grade.points != gchange.points:
if last_grade.max_points != gchange.max_points:
if last_grade.comment != gchange.comment:
string_concat(
"%(participation)s: %(updated)s ",
_("updated")
) % {
'participation': gchange.participation,
'updated': ", ".join(updated)})
result.append(gchange)
else:
@course_view
@transaction.atomic
def import_grades(pctx):
Andreas Klöckner
committed
if not pctx.has_permission(pperm.batch_import_grade):
raise PermissionDenied(_("may not batch-import grades"))
log_lines = []
request = pctx.request
if request.method == "POST":
form = ImportGradesForm(
pctx.course, request.POST, request.FILES)
is_import = "import" in request.POST
if form.is_valid():
try:
f = request.FILES["file"]
f.seek(0)
data = f.read().decode("utf-8", errors="replace")
total_count, grade_changes = csv_to_grade_changes(
course=pctx.course,
grading_opportunity=form.cleaned_data["grading_opportunity"],
attempt_id=form.cleaned_data["attempt_id"],
attr_type=form.cleaned_data["attr_type"],
attr_column=form.cleaned_data["attr_column"],
points_column=form.cleaned_data["points_column"],
feedback_column=form.cleaned_data["feedback_column"],
max_points=form.cleaned_data["max_points"],
creator=request.user,
grade_time=now(),
has_header=form.cleaned_data["format"] == "csvhead")
except Exception as e:
messages.add_message(pctx.request, messages.ERROR,
string_concat(
pgettext_lazy("Starting of Error message",
"Error"),
": %(err_type)s %(err_str)s")
% {
"err_type": type(e).__name__,
"err_str": str(e)})
if total_count != len(grade_changes):
messages.add_message(pctx.request, messages.INFO,
_("%(total)d grades found, %(unchaged)d unchanged.")
% {'total': total_count,
'unchaged': total_count - len(grade_changes)})
from django.template.loader import render_to_string
if is_import:
GradeChange.objects.bulk_create(grade_changes)
form_text = render_to_string(
"course/grade-import-preview.html", {
"show_grade_changes": False,
"log_lines": log_lines,
})
messages.add_message(pctx.request, messages.SUCCESS,
else:
form_text = render_to_string(
"course/grade-import-preview.html", {
"show_grade_changes": True,
"log_lines": log_lines,
})
else:
form = ImportGradesForm(pctx.course)
return render_course_page(pctx, "course/generic-course-form.html", {
"form": form,
"form_text": form_text,
})
# }}}
# {{{ download all submissions
class DownloadAllSubmissionsForm(StyledForm):
def __init__(self, page_ids, session_tag_choices, *args, **kwargs):
super(DownloadAllSubmissionsForm, self).__init__(*args, **kwargs)
self.fields["page_id"] = forms.ChoiceField(
choices=tuple(
(pid, pid)
for pid in page_ids),
label=_("Page ID"))
self.fields["which_attempt"] = forms.ChoiceField(
choices=(
("first", _("Least recent attempt")),
("last", _("Most recent attempt")),
label=_("Attempts to include."),
help_text=_(
"Every submission to the page counts as an attempt."),
initial="last")
self.fields["restrict_to_rules_tag"] = forms.ChoiceField(
choices=session_tag_choices,
help_text=_("Only download sessions tagged with this rules tag."),
label=_("Restrict to rules tag"))
self.fields["non_in_progress_only"] = forms.BooleanField(
required=False,
initial=True,
help_text=_("Only download submissions from non-in-progress "
"sessions"),
label=_("Non-in-progress only"))
self.fields["include_feedback"] = forms.BooleanField(
required=False,
help_text=_("Include provided feedback as text file in zip"),
label=_("Include feedback"))
self.fields["extra_file"] = forms.FileField(
label=_("Additional File"),
help_text=_(
"If given, the uploaded file will be included "
"in the zip archive. "
"If the produced archive is to be used for plagiarism "
"detection, then this may be used to include the reference "
"solution."),
required=False)
self.helper.add_input(
Submit("download", _("Download")))
@course_view
def download_all_submissions(pctx, flow_id):
Andreas Klöckner
committed
if not pctx.has_permission(pperm.batch_download_submission):
raise PermissionDenied(_("may not batch-download submissions"))
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
from course.content import get_flow_desc
flow_desc = get_flow_desc(pctx.repo, pctx.course, flow_id,
pctx.course_commit_sha)
# {{{ find access rules tag
if hasattr(flow_desc, "rules"):
access_rules_tags = getattr(flow_desc.rules, "tags", [])
else:
access_rules_tags = []
ALL_SESSION_TAG = string_concat("<<<", _("ALL"), ">>>") # noqa
session_tag_choices = [
(tag, tag)
for tag in access_rules_tags] + [(ALL_SESSION_TAG,
string_concat("(", _("ALL"), ")"))]
# }}}
page_ids = [
"%s/%s" % (group_desc.id, page_desc.id)
for group_desc in flow_desc.groups
for page_desc in group_desc.pages]
from course.page.base import AnswerFeedback
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
request = pctx.request
if request.method == "POST":
form = DownloadAllSubmissionsForm(page_ids, session_tag_choices,
request.POST)
if form.is_valid():
which_attempt = form.cleaned_data["which_attempt"]
slash_index = form.cleaned_data["page_id"].index("/")
group_id = form.cleaned_data["page_id"][:slash_index]
page_id = form.cleaned_data["page_id"][slash_index+1:]
from course.utils import PageInstanceCache
page_cache = PageInstanceCache(pctx.repo, pctx.course, flow_id)
visits = (FlowPageVisit.objects
.filter(
flow_session__course=pctx.course,
flow_session__flow_id=flow_id,
page_data__group_id=group_id,
page_data__page_id=page_id,
is_submitted_answer=True,
)
.select_related("flow_session")
.select_related("flow_session__participation__user")
.select_related("page_data")
# We overwrite earlier submissions with later ones
# in a dictionary below.
.order_by("visit_time"))
if form.cleaned_data["non_in_progress_only"]:
visits = visits.filter(flow_session__in_progress=False)
if form.cleaned_data["restrict_to_rules_tag"] != ALL_SESSION_TAG:
visits = (visits
.filter(
flow_session__access_rules_tag=(
form.cleaned_data["restrict_to_rules_tag"])))
submissions = {}
for visit in visits:
page = page_cache.get_page(group_id, page_id,
pctx.course_commit_sha)
from course.page import PageContext
grading_page_context = PageContext(
course=pctx.course,
repo=pctx.repo,
commit_sha=pctx.course_commit_sha,
flow_session=visit.flow_session)
bytes_answer = page.normalized_bytes_answer(
grading_page_context, visit.page_data.data,
visit.answer)
if which_attempt in ["first", "last"]:
key = (visit.flow_session.participation.user.username,)
elif which_attempt == "all":
key = (visit.flow_session.participation.user.username,
str(visit.flow_session.id))
else:
raise NotImplementedError()
if bytes_answer is not None:
if (which_attempt == "first"
and key in submissions):
# Already there, disregard further ones
continue
submissions[key] = (
bytes_answer, list(visit.grades.all()))
from six import BytesIO
from zipfile import ZipFile
bio = BytesIO()
with ZipFile(bio, "w") as subm_zip:
for key, ((extension, bytes_answer), visit_grades) in \
basename = "-".join(key)
basename + extension,
if form.cleaned_data["include_feedback"]:
feedback_lines = []
feedback_lines.append(
"scores: %s" % (
", ".join(
str(g.correctness)
for g in visit_grades)))
for i, grade in enumerate(visit_grades):
feedback_lines.append(75*"-")
feedback_lines.append(
"grade %i: score: %s" % (i+1, grade.correctness))
afb = AnswerFeedback.from_json(grade.feedback, None)
if afb is not None:
feedback_lines.append(afb.feedback)
subm_zip.writestr(
basename + "-feedback.txt",
"\n".join(feedback_lines))
extra_file = request.FILES.get("extra_file")
if extra_file is not None:
subm_zip.writestr(extra_file.name, extra_file.read())
response = http.HttpResponse(
bio.getvalue(),
content_type="application/zip")
response['Content-Disposition'] = (
'attachment; filename="submissions_%s_%s_%s_%s_%s.zip"'
% (pctx.course.identifier, flow_id, group_id, page_id,
now().date().strftime("%Y-%m-%d")))
return response
else:
form = DownloadAllSubmissionsForm(page_ids, session_tag_choices)
return render_course_page(pctx, "course/generic-course-form.html", {
"form": form,
"form_description": _("Download All Submissions in Zip file")
})
# }}}
Andreas Klöckner
committed
# {{{ edit_grading_opportunity
class EditGradingOpportunityForm(StyledModelForm):
def __init__(self, add_new, *args, **kwargs):
Andreas Klöckner
committed
super(EditGradingOpportunityForm, self).__init__(*args, **kwargs)
if not add_new:
self.fields["identifier"].disabled = True
Andreas Klöckner
committed
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
self.fields["flow_id"].disabled = True
self.fields["creation_time"].disabled = True
self.helper.add_input(
Submit("submit", _("Update")))
class Meta:
model = GradingOpportunity
exclude = (
"course",
# not used
"due_time",
)
widgets = {
"hide_superseded_grade_history_before":
DateTimePicker(
options={"format": "YYYY-MM-DD HH:mm", "sideBySide": True}),
}
@course_view
def edit_grading_opportunity(pctx, opportunity_id):
# type: (CoursePageContext, int) -> http.HttpResponse
if not pctx.has_permission(pperm.edit_grading_opportunity):
raise PermissionDenied()
request = pctx.request
num_opportunity_id = int(opportunity_id)
if num_opportunity_id == -1:
gopp = GradingOpportunity(course=pctx.course)
add_new = True
else:
gopp = get_object_or_404(GradingOpportunity, id=num_opportunity_id)
add_new = False
Andreas Klöckner
committed
if gopp.course.id != pctx.course.id:
raise SuspiciousOperation(
"may not edit grading opportunity in different course")
if request.method == 'POST':
form = EditGradingOpportunityForm(add_new, request.POST, instance=gopp)
Andreas Klöckner
committed
if form.is_valid():
form.save()
return redirect("relate-edit_grading_opportunity",
pctx.course.identifier, form.instance.id)
Andreas Klöckner
committed
else:
form = EditGradingOpportunityForm(add_new, instance=gopp)
Andreas Klöckner
committed
return render_course_page(pctx, "course/generic-course-form.html", {
"form_description": _("Edit Grading Opportunity"),
"form": form
})
# }}}