Skip to content
grades.py 56.1 KiB
Newer Older

        self.fields["attempt_id"] = forms.CharField(
                initial="main",
ifaint's avatar
ifaint committed
                required=True,
                label=_("Attempt ID"))
        self.fields["file"] = forms.FileField(
                label=_("File"))

        self.fields["format"] = forms.ChoiceField(
                choices=(
ifaint's avatar
ifaint committed
                    ("csvhead", _("CSV with Header")),
                    ("csv", "CSV"),
ifaint's avatar
ifaint committed
                    ),
                label=_("Format"))
        self.fields["attr_type"] = forms.ChoiceField(
                choices=(
                    ("email_or_id", _("Email or NetID")),
                    ("inst_id", _("Institutional ID")),
                    ),
                label=_("User attribute"))

        self.fields["attr_column"] = forms.IntegerField(
Andreas Klöckner's avatar
Andreas Klöckner committed
                # Translators: the following strings are for the format
                # informatioin for a CSV file to be imported.
                help_text=_("1-based column index for the user attribute "
                "selected above to locate student record"),
ifaint's avatar
ifaint committed
                min_value=1,
                label=_("User attribute column"))
        self.fields["points_column"] = forms.IntegerField(
ifaint's avatar
ifaint committed
                help_text=_("1-based column index for the (numerical) grade"),
ifaint's avatar
ifaint committed
                min_value=1,
                label=_("Points column"))
        self.fields["feedback_column"] = forms.IntegerField(
ifaint's avatar
ifaint committed
                help_text=_("1-based column index for further (textual) feedback"),
ifaint's avatar
ifaint committed
                min_value=1, required=False,
                label=_("Feedback column"))
        self.fields["max_points"] = forms.DecimalField(
ifaint's avatar
ifaint committed
                initial=100,
                # Translators: "Max point" refers to full credit in points.
                label=_("Max points"))
        self.helper.add_input(Submit("preview", _("Preview")))
        self.helper.add_input(Submit("import", _("Import")))
    def clean(self):
        data = super(ImportGradesForm, self).clean()
Andreas Klöckner's avatar
Andreas Klöckner committed
        file_contents = data.get("file")
Dong Zhuang's avatar
Dong Zhuang committed
            column_idx_list = [
Dong Zhuang's avatar
Dong Zhuang committed
                data["points_column"],
                data["feedback_column"]
            ]
Andreas Klöckner's avatar
Andreas Klöckner committed
            has_header = data["format"] == "csvhead"
Dong Zhuang's avatar
Dong Zhuang committed
            header_count = 1 if has_header else 0

            from course.utils import csv_data_importable

            importable, err_msg = csv_data_importable(
                    file_contents,
                    column_idx_list,
                    header_count)

            if not importable:
                self.add_error('file', err_msg)
Dong Zhuang's avatar
Dong Zhuang committed

class ParticipantNotFound(ValueError):
    pass

def find_participant_from_inst_id(course, inst_id_str):
    inst_id_str = inst_id_str.strip()

    matches = (Participation.objects
            .filter(
                course=course,
                status=participation_status.active,
                user__institutional_id__exact=inst_id_str)
            .select_related("user"))

    if not matches:
        raise ParticipantNotFound(
                # Translators: use institutional_id_string to find user
                # (participant).
                _("no participant found with institutional ID "
                "'%(inst_id_string)s'") % {
                    "inst_id_string": inst_id_str})
    if len(matches) > 1:
        raise ParticipantNotFound(
                _("more than one participant found with institutional ID "
                "'%(inst_id_string)s'") % {
                    "inst_id_string": inst_id_str})

    return matches[0]

def find_participant_from_id(course, id_str):
Andreas Klöckner's avatar
Andreas Klöckner committed
    id_str = id_str.strip().lower()

    matches = (Participation.objects
            .filter(
                course=course,
Andreas Klöckner's avatar
Andreas Klöckner committed
                status=participation_status.active,
                user__email__istartswith=id_str)

    surviving_matches = []
    for match in matches:
Andreas Klöckner's avatar
Andreas Klöckner committed
        if match.user.email.lower() == id_str:
            surviving_matches.append(match)
            continue

Andreas Klöckner's avatar
Andreas Klöckner committed
        email = match.user.email.lower()
        at_index = email.index("@")
        assert at_index > 0
        uid = email[:at_index]

        if uid == id_str:
            surviving_matches.append(match)
            continue

    if not surviving_matches:
        raise ParticipantNotFound(
ifaint's avatar
ifaint committed
                # Translators: use id_string to find user (participant).
                _("no participant found for '%(id_string)s'") % {
                    "id_string": id_str})
    if len(surviving_matches) > 1:
        raise ParticipantNotFound(
                _("more than one participant found for '%(id_string)s'") % {
                    "id_string": id_str})

    return surviving_matches[0]


def fix_decimal(s):
    if "," in s and "." not in s:
        comma_count = len([c for c in s if c == ","])
        if comma_count == 1:
            return s.replace(",", ".")
        else:
            return s

    else:
        return s


def csv_to_grade_changes(
        log_lines,
        course, grading_opportunity, attempt_id, file_contents,
        attr_type, attr_column, points_column, feedback_column,
        max_points, creator, grade_time, has_header):
    result = []

    import csv

    total_count = 0
    spamreader = csv.reader(file_contents)
    for row in spamreader:
        if has_header:
            has_header = False
            continue

        gchange = GradeChange()
        gchange.opportunity = grading_opportunity
            if attr_type == "email_or_id":
                gchange.participation = find_participant_from_id(
                        course, row[attr_column-1])
            elif attr_type == "inst_id":
                gchange.participation = find_participant_from_inst_id(
                        course, row[attr_column-1])
            else:
                raise ParticipantNotFound(
                    _("Unknown user attribute '%(attr_type)s'") % {
                        "attr_type": attr_type})
        except ParticipantNotFound as e:
            log_lines.append(e)
        gchange.state = grade_state_change_types.graded
        gchange.attempt_id = attempt_id

        points_str = row[points_column-1].strip()
        # Moodle's "NULL" grades look like this.
        if points_str in ["-", ""]:
            gchange.points = None
        else:
            gchange.points = float(fix_decimal(points_str))
        gchange.max_points = max_points
        if feedback_column is not None:
            gchange.comment = row[feedback_column-1]

        gchange.creator = creator
        gchange.grade_time = grade_time

        last_grades = (GradeChange.objects
                .filter(
                    opportunity=grading_opportunity,
                    participation=gchange.participation,
                    attempt_id=gchange.attempt_id)
                .order_by("-grade_time")[:1])

        if last_grades.count():
            last_grade, = last_grades

            if last_grade.state == grade_state_change_types.graded:

                updated = []
                if last_grade.points != gchange.points:
Dong Zhuang's avatar
Dong Zhuang committed
                    updated.append(ugettext("points"))
                if last_grade.max_points != gchange.max_points:
Dong Zhuang's avatar
Dong Zhuang committed
                    updated.append(ugettext("max_points"))
                if last_grade.comment != gchange.comment:
Dong Zhuang's avatar
Dong Zhuang committed
                    updated.append(ugettext("comment"))
Andreas Klöckner's avatar
Andreas Klöckner committed
                    log_lines.append(
                            string_concat(
                                "%(participation)s: %(updated)s ",
                                _("updated")
                                ) % {
                                    'participation': gchange.participation,
                                    'updated': ", ".join(updated)})
                result.append(gchange)
            result.append(gchange)

    return total_count, result


@course_view
@transaction.atomic
def import_grades(pctx):
    if not pctx.has_permission(pperm.batch_import_grade):
        raise PermissionDenied(_("may not batch-import grades"))

    form_text = ""
    request = pctx.request
    if request.method == "POST":
        form = ImportGradesForm(
                pctx.course, request.POST, request.FILES)

        is_import = "import" in request.POST
        if form.is_valid():
            try:
                total_count, grade_changes = csv_to_grade_changes(
                        log_lines=log_lines,
                        course=pctx.course,
                        grading_opportunity=form.cleaned_data["grading_opportunity"],
                        attempt_id=form.cleaned_data["attempt_id"],
                        file_contents=request.FILES["file"],
                        attr_type=form.cleaned_data["attr_type"],
                        attr_column=form.cleaned_data["attr_column"],
                        points_column=form.cleaned_data["points_column"],
                        feedback_column=form.cleaned_data["feedback_column"],
                        max_points=form.cleaned_data["max_points"],
                        creator=request.user,
                        grade_time=now(),
                        has_header=form.cleaned_data["format"] == "csvhead")
            except Exception as e:
                messages.add_message(pctx.request, messages.ERROR,
                        string_concat(
                            pgettext_lazy("Starting of Error message",
                                "Error"),
                            ": %(err_type)s %(err_str)s")
                        % {
                            "err_type": type(e).__name__,
                            "err_str": str(e)})
            else:
                if total_count != len(grade_changes):
                    messages.add_message(pctx.request, messages.INFO,
                            _("%(total)d grades found, %(unchaged)d unchanged.")
                            % {'total': total_count,
                               'unchaged': total_count - len(grade_changes)})
                from django.template.loader import render_to_string

                if is_import:
                    GradeChange.objects.bulk_create(grade_changes)
                    form_text = render_to_string(
                            "course/grade-import-preview.html", {
                                "show_grade_changes": False,
                                "log_lines": log_lines,
                                })
                    messages.add_message(pctx.request, messages.SUCCESS,
ifaint's avatar
ifaint committed
                            _("%d grades imported.") % len(grade_changes))
                else:
                    form_text = render_to_string(
                            "course/grade-import-preview.html", {
                                "show_grade_changes": True,
                                "grade_changes": grade_changes,
                                "log_lines": log_lines,
                                })

    else:
        form = ImportGradesForm(pctx.course)

    return render_course_page(pctx, "course/generic-course-form.html", {
ifaint's avatar
ifaint committed
        "form_description": _("Import Grade Data"),
        "form": form,
        "form_text": form_text,
        })

# }}}


# {{{ download all submissions

class DownloadAllSubmissionsForm(StyledForm):
    def __init__(self, page_ids, session_tag_choices, *args, **kwargs):
        super(DownloadAllSubmissionsForm, self).__init__(*args, **kwargs)

        self.fields["page_id"] = forms.ChoiceField(
                choices=tuple(
                    (pid, pid)
                    for pid in page_ids),
                label=_("Page ID"))
        self.fields["which_attempt"] = forms.ChoiceField(
                choices=(
                    ("first", _("Least recent attempt")),
                    ("last", _("Most recent attempt")),
                    ("all", _("All attempts")),
                    ),
                label=_("Attempts to include."),
                help_text=_(
                    "Every submission to the page counts as an attempt."),
                initial="last")
        self.fields["restrict_to_rules_tag"] = forms.ChoiceField(
                choices=session_tag_choices,
                help_text=_("Only download sessions tagged with this rules tag."),
                label=_("Restrict to rules tag"))
        self.fields["non_in_progress_only"] = forms.BooleanField(
                required=False,
                initial=True,
                help_text=_("Only download submissions from non-in-progress "
                    "sessions"),
                label=_("Non-in-progress only"))
        self.fields["include_feedback"] = forms.BooleanField(
                required=False,
                initial=True,
                help_text=_("Include provided feedback as text file in zip"),
                label=_("Include feedback"))
        self.fields["extra_file"] = forms.FileField(
                label=_("Additional File"),
                help_text=_(
                    "If given, the uploaded file will be included "
                    "in the zip archive. "
                    "If the produced archive is to be used for plagiarism "
                    "detection, then this may be used to include the reference "
                    "solution."),
                required=False)

        self.helper.add_input(
                Submit("download", _("Download")))


@course_view
def download_all_submissions(pctx, flow_id):
    if not pctx.has_permission(pperm.batch_download_submission):
        raise PermissionDenied(_("may not batch-download submissions"))

    from course.content import get_flow_desc
    flow_desc = get_flow_desc(pctx.repo, pctx.course, flow_id,
            pctx.course_commit_sha)

    # {{{ find access rules tag

    if hasattr(flow_desc, "rules"):
        access_rules_tags = getattr(flow_desc.rules, "tags", [])
    else:
        access_rules_tags = []

    ALL_SESSION_TAG = string_concat("<<<", _("ALL"), ">>>")  # noqa
    session_tag_choices = [
            (tag, tag)
            for tag in access_rules_tags] + [(ALL_SESSION_TAG,
                    string_concat("(", _("ALL"), ")"))]

    # }}}

    page_ids = [
            "%s/%s" % (group_desc.id, page_desc.id)
            for group_desc in flow_desc.groups
            for page_desc in group_desc.pages]

    from course.page.base import AnswerFeedback

    request = pctx.request
    if request.method == "POST":
        form = DownloadAllSubmissionsForm(page_ids, session_tag_choices,
                request.POST)

        if form.is_valid():
            which_attempt = form.cleaned_data["which_attempt"]

            slash_index = form.cleaned_data["page_id"].index("/")
            group_id = form.cleaned_data["page_id"][:slash_index]
            page_id = form.cleaned_data["page_id"][slash_index+1:]

            from course.utils import PageInstanceCache
            page_cache = PageInstanceCache(pctx.repo, pctx.course, flow_id)

            visits = (FlowPageVisit.objects
                    .filter(
                        flow_session__course=pctx.course,
                        flow_session__flow_id=flow_id,
                        page_data__group_id=group_id,
                        page_data__page_id=page_id,
                        is_submitted_answer=True,
                        )
                    .select_related("flow_session")
                    .select_related("flow_session__participation__user")
                    .select_related("page_data")

                    # We overwrite earlier submissions with later ones
                    # in a dictionary below.
                    .order_by("visit_time"))

            if form.cleaned_data["non_in_progress_only"]:
                visits = visits.filter(flow_session__in_progress=False)

            if form.cleaned_data["restrict_to_rules_tag"] != ALL_SESSION_TAG:
                visits = (visits
                        .filter(
                            flow_session__access_rules_tag=(
                                form.cleaned_data["restrict_to_rules_tag"]))
                        .select_related("grades"))

            submissions = {}

            for visit in visits:
                page = page_cache.get_page(group_id, page_id,
                        pctx.course_commit_sha)

                from course.page import PageContext
                grading_page_context = PageContext(
                        course=pctx.course,
                        repo=pctx.repo,
                        commit_sha=pctx.course_commit_sha,
                        flow_session=visit.flow_session)

                bytes_answer = page.normalized_bytes_answer(
                        grading_page_context, visit.page_data.data,
                        visit.answer)

                if which_attempt in ["first", "last"]:
                    key = (visit.flow_session.participation.user.username,)
                elif which_attempt == "all":
                    key = (visit.flow_session.participation.user.username,
                            str(visit.flow_session.id))
                else:
                    raise NotImplementedError()

                if bytes_answer is not None:
                    if (which_attempt == "first"
                            and key in submissions):
                        # Already there, disregard further ones
                        continue

                    submissions[key] = (
                            bytes_answer, list(visit.grades.all()))

            from six import BytesIO
            from zipfile import ZipFile
            bio = BytesIO()
            with ZipFile(bio, "w") as subm_zip:
                for key, ((extension, bytes_answer), visit_grades) in \
                        six.iteritems(submissions):
                    subm_zip.writestr(
                    if form.cleaned_data["include_feedback"]:
                        feedback_lines = []

                        feedback_lines.append(
                            "scores: %s" % (
                                ", ".join(
                                    str(g.correctness)
                                    for g in visit_grades)))

                        for i, grade in enumerate(visit_grades):
                            feedback_lines.append(75*"-")
                            feedback_lines.append(
                                "grade %i: score: %s" % (i+1, grade.correctness))
                            feedback_lines.append(
                                AnswerFeedback.from_json(
                                    grade.feedback, None).feedback)

                        subm_zip.writestr(
                                basename + "-feedback.txt",
                                "\n".join(feedback_lines))

                extra_file = request.FILES.get("extra_file")
                if extra_file is not None:
                    subm_zip.writestr(extra_file.name, extra_file.read())

            response = http.HttpResponse(
                    bio.getvalue(),
                    content_type="application/zip")
            response['Content-Disposition'] = (
                    'attachment; filename="submissions_%s_%s_%s_%s_%s.zip"'
                    % (pctx.course.identifier, flow_id, group_id, page_id,
                        now().date().strftime("%Y-%m-%d")))
            return response

    else:
        form = DownloadAllSubmissionsForm(page_ids, session_tag_choices)

    return render_course_page(pctx, "course/generic-course-form.html", {
        "form": form,
        "form_description": _("Download All Submissions in Zip file")
        })

# }}}


# {{{ edit_grading_opportunity

class EditGradingOpportunityForm(StyledModelForm):
    def __init__(self, add_new, *args, **kwargs):
        # type: (bool, *Any, **Any) -> None
        super(EditGradingOpportunityForm, self).__init__(*args, **kwargs)

        if not add_new:
            self.fields["identifier"].disabled = True

        self.fields["flow_id"].disabled = True
        self.fields["creation_time"].disabled = True

        self.helper.add_input(
                Submit("submit", _("Update")))

    class Meta:
        model = GradingOpportunity
        exclude = (
                "course",
                # not used
                "due_time",
                )
        widgets = {
                "hide_superseded_grade_history_before":
                DateTimePicker(
                    options={"format": "YYYY-MM-DD HH:mm", "sideBySide": True}),
                }


@course_view
def edit_grading_opportunity(pctx, opportunity_id):
    # type: (CoursePageContext, int) -> http.HttpResponse
    if not pctx.has_permission(pperm.edit_grading_opportunity):
        raise PermissionDenied()

    request = pctx.request

    num_opportunity_id = int(opportunity_id)
    if num_opportunity_id == -1:
        gopp = GradingOpportunity(course=pctx.course)
        add_new = True
    else:
        gopp = get_object_or_404(GradingOpportunity, id=num_opportunity_id)
        add_new = False

    if gopp.course.id != pctx.course.id:
        raise SuspiciousOperation(
                "may not edit grading opportunity in different course")

    if request.method == 'POST':
        form = EditGradingOpportunityForm(add_new, request.POST, instance=gopp)
            return redirect("relate-edit_grading_opportunity",
                    pctx.course.identifier, form.instance.id)
        form = EditGradingOpportunityForm(add_new, instance=gopp)

    return render_course_page(pctx, "course/generic-course-form.html", {
        "form_description": _("Edit Grading Opportunity"),
        "form": form
        })

# }}}

# vim: foldmethod=marker