Skip to content
auth.py 48.6 KiB
Newer Older
                raise forms.ValidationError(_("This field is required."))
Dong Zhuang's avatar
Dong Zhuang committed
            if any([inst_id, inst_id_confirmed]) and inst_id != inst_id_confirmed:
                raise forms.ValidationError(_("Inputs do not match."))
        return inst_id_confirmed
def user_profile(request):
    if not request.user.is_authenticated:
    user_form = None
Dong Zhuang's avatar
Dong Zhuang committed
        if getattr(settings,
                   "RELATE_EDITABLE_INST_ID_BEFORE_VERIFICATION", True):
            return True if (user.institutional_id
                    and user.institutional_id_verified) else False
        else:
            return True if user.institutional_id else False
    if request.method == "POST":
        if "submit_user" in request.POST:
            user_form = UserForm(
                    request.POST,
                    instance=request.user,
                    is_inst_id_locked=is_inst_id_locked(request.user),
            if user_form.is_valid():
Dong Zhuang's avatar
Dong Zhuang committed
                if user_form.has_changed():
                    user_form.save()
                    messages.add_message(request, messages.SUCCESS,
                            _("Profile data updated."))
                    request.user.refresh_from_db()

                else:
                    messages.add_message(request, messages.INFO,
                            _("No change was made on your profile."))
                if request.GET.get("first_login"):
                    return redirect("relate-home")
                if (request.GET.get("set_inst_id")
Dong Zhuang's avatar
Dong Zhuang committed
                        and request.GET.get("referer")):
                    return redirect(request.GET["referer"])

                user_form = UserForm(
                        instance=request.user,
                        is_inst_id_locked=is_inst_id_locked(request.user))

    if user_form is None:
Dong Zhuang's avatar
Dong Zhuang committed
        request.user.refresh_from_db()
        user_form = UserForm(
            instance=request.user,
            is_inst_id_locked=is_inst_id_locked(request.user),
        )
    if is_inst_id_locked(request.user):
        user_form.fields['institutional_id'].widget.attrs['disabled'] = True
        user_form.fields['institutional_id_confirm'].widget.attrs['disabled'] = True
    return render(request, "user-profile-form.html", {
Dong Zhuang's avatar
Dong Zhuang committed
        "form": user_form,
        "form_description": _("User Profile"),
        "is_inst_id_locked": is_inst_id_locked(request.user),
        "enable_inst_id_if_not_locked": (
            request.GET.get("first_login")
            or (request.GET.get("set_inst_id")
Dong Zhuang's avatar
Dong Zhuang committed
                and request.GET.get("referer"))
# {{{ SAML auth backend

# This ticks the 'verified' boxes once we've receive attribute assertions
# through SAML2.

class Saml2Backend(Saml2BackendBase):
    def _set_attribute(self, obj, attr, value):
        mod = super(Saml2Backend, self)._set_attribute(obj, attr, value)

        if attr == "institutional_id":
            if not obj.institutional_id_verified:
                obj.institutional_id_verified = True
                mod = True

        if attr in ["first_name", "last_name"]:
            if not obj.name_verified:
                obj.name_verified = True
                mod = True

        if attr == "email":
            from course.constants import user_status
            if obj.status != user_status.active:
                obj.status = user_status.active
                mod = True

# {{{ sign-out

def sign_out_confirmation(request, redirect_field_name=REDIRECT_FIELD_NAME):
    if not request.user.is_authenticated:
        messages.add_message(request, messages.ERROR,
                             _("You've already signed out."))
        return redirect("relate-home")

    redirect_to = request.POST.get(redirect_field_name,
                                   request.GET.get(redirect_field_name, ''))

    next_uri = ""
    if redirect_to:
        next_uri = "?%s=%s" % (redirect_field_name, redirect_to)

    return render(request, "sign-out-confirmation.html",
                  {"next_uri": next_uri})


@never_cache
def sign_out(request, redirect_field_name=REDIRECT_FIELD_NAME):
    if not request.user.is_authenticated:
        messages.add_message(request, messages.ERROR,
                             _("You've already signed out."))
        return redirect("relate-home")
    redirect_to = request.POST.get(redirect_field_name,
                                   request.GET.get(redirect_field_name, ''))
Andreas Klöckner's avatar
Andreas Klöckner committed
    response = None
    if settings.RELATE_SIGN_IN_BY_SAML2_ENABLED:
        from djangosaml2.views import _get_subject_id, logout as saml2_logout
        if _get_subject_id(request.session) is not None:
Andreas Klöckner's avatar
Andreas Klöckner committed
            response = saml2_logout(request)

    auth_logout(request)
Andreas Klöckner's avatar
Andreas Klöckner committed
    if response is not None:
        return response
Andreas Klöckner's avatar
Andreas Klöckner committed
    else:
        return redirect("relate-home")
class APIError(Exception):
    pass


def find_matching_token(
        course_identifier=None, token_id=None, token_hash_str=None,
        now_datetime=None):
        token = AuthenticationToken.objects.get(
                id=token_id,
                participation__course__identifier=course_identifier)
    except AuthenticationToken.DoesNotExist:
        return None

    from django.contrib.auth.hashers import check_password
    if not check_password(token_hash_str, token.token_hash):
        return None

    if token.revocation_time is not None:
        return None
    if token.valid_until is not None and now_datetime > token.valid_until:
        return None

    return token


class APIBearerTokenBackend(object):
    def authenticate(self, course_identifier=None, token_id=None,
            token_hash_str=None, now_datetime=None):
        token = find_matching_token(course_identifier, token_id, token_hash_str,
                now_datetime)
        if token is None:
            return None

        token.last_use_time = now_datetime
        token.save()

        return token.user

    def get_user(self, user_id):
        try:
            return get_user_model().objects.get(pk=user_id)
        except get_user_model().DoesNotExist:
            return None


AUTH_HEADER_RE = re.compile("^Token ([0-9]+)_([a-z0-9]+)$")


class APIContext(object):
    def __init__(self, request, token):
        self.request = request

        self.token = token
        self.participation = token.participation
        self.course = self.participation.course

        restrict_to_role = token.restrict_to_participation_role
        if restrict_to_role is not None:
            role_restriction_ok = False

            if restrict_to_role in token.participation.roles.all():
                role_restriction_ok = True

            if not role_restriction_ok and self.participation.has_permission(
                    pperm.impersonate_role, restrict_to_role.identifier):
                role_restriction_ok = True

            if not role_restriction_ok:
                raise PermissionDenied(
                        "API token specifies invalid role restriction")

        self.restrict_to_role = restrict_to_role

    def has_permission(self, perm, argument=None):
        if self.restrict_to_role is None:
            return self.participation.has_permission(perm, argument)
        else:
            return self.restrict_to_role.has_permission(perm, argument)


def with_course_api_auth(f):
    def wrapper(request, course_identifier, *args, **kwargs):
        from django.utils.timezone import now
        now_datetime = now()

        try:
            auth_header = request.META.get("HTTP_AUTHORIZATION", None)
            if auth_header is None:
                raise PermissionDenied("No Authorization header provided")
            match = AUTH_HEADER_RE.match(auth_header)
            if match is None:
                raise PermissionDenied("ill-formed Authorization header")
            auth_data = dict(
                    course_identifier=course_identifier,
                    token_id=int(match.group(1)),
                    token_hash_str=match.group(2),
                    now_datetime=now_datetime)
            # FIXME: Redundant db roundtrip
            token = find_matching_token(**auth_data)
            if token is None:
                raise PermissionDenied("invalid authentication token")
            from django.contrib.auth import authenticate, login
            user = authenticate(**auth_data)
            response = f(
                    APIContext(request, token),
                    course_identifier, *args, **kwargs)
        except PermissionDenied as e:
            return http.HttpResponseForbidden(
                    "403 Forbidden: " + str(e))
        except APIError as e:
            return http.HttpResponseBadRequest(
                    "400 Bad Request: " + str(e))

        return response

    from functools import update_wrapper
    update_wrapper(wrapper, f)

    return wrapper

# }}}


# {{{ manage API auth tokens

class AuthenticationTokenForm(StyledModelForm):
    class Meta:
        model = AuthenticationToken
        fields = (
                "restrict_to_participation_role",
                "description",
                "valid_until",
                )

        widgets = {
                "valid_until": DateTimePicker(options={"format": "YYYY-MM-DD"})
                }

    def __init__(self, participation, *args, **kwargs):
        # type: (Participation, *Any, **Any) -> None
        super(AuthenticationTokenForm, self).__init__(*args, **kwargs)
        self.participation = participation

        self.fields["restrict_to_participation_role"].queryset = (
                participation.roles.all()
                | ParticipationRole.objects.filter(
                    id__in=[
                        prole.id
                        for prole in ParticipationRole.objects.filter(
                            course=participation.course)
                        if participation.has_permission(
                            pperm.impersonate_role, prole.identifier)
                    ]))

        self.helper.add_input(Submit("create", _("Create")))


@course_view
def manage_authentication_tokens(pctx):
    # type: (http.HttpRequest) -> http.HttpResponse
    if not request.user.is_authenticated:
        raise PermissionDenied()

    from course.views import get_now_or_fake_time
    now_datetime = get_now_or_fake_time(request)

    if request.method == 'POST':
        form = AuthenticationTokenForm(pctx.participation, request.POST)

        revoke_prefix = "revoke_"
        revoke_post_args = [key for key in request.POST if key.startswith("revoke_")]

        if revoke_post_args:
            token_id = int(revoke_post_args[0][len(revoke_prefix):])
            auth_token = get_object_or_404(AuthenticationToken,
                    id=token_id,
                    user=request.user)

            auth_token.revocation_time = now_datetime
            auth_token.save()

            form = AuthenticationTokenForm(pctx.participation)

        elif "create" in request.POST:
            if form.is_valid():
                token = make_sign_in_key(request.user)

                from django.contrib.auth.hashers import make_password
                auth_token = AuthenticationToken(
                        user=pctx.request.user,
                        participation=pctx.participation,
                        restrict_to_participation_role=form.cleaned_data[
                            "restrict_to_participation_role"],
                        description=form.cleaned_data["description"],
                        valid_until=form.cleaned_data["valid_until"],
                        token_hash=make_password(token))
                auth_token.save()

                user_token = "%d_%s" % (auth_token.id, token)

                messages.add_message(request, messages.SUCCESS,
                        _("A new authentication token has been created: %s. "
                            "Please save this token, as you will not be able "
                            "to retrieve it later.")
                        % user_token)
        else:
            messages.add_message(request, messages.ERROR,
                    _("Could not find which button was pressed."))
        form = AuthenticationTokenForm(pctx.participation)

    from django.db.models import Q
    tokens = AuthenticationToken.objects.filter(
            user=request.user,
            ).filter(
                Q(revocation_time=None)
                | Q(revocation_time__gt=now_datetime - timedelta(weeks=1)))
    return render_course_page(pctx, "course/manage-auth-tokens.html", {
        "form": form,
        "new_token_message": "",
        "tokens": tokens,
        })

# }}}

# vim: foldmethod=marker