Skip to content
auth.py 49.9 KiB
Newer Older
Dong Zhuang's avatar
Dong Zhuang committed
                    messages.add_message(request, messages.INFO,
                            _("No change was made on your profile."))
                if request.GET.get("first_login"):
                    return redirect("relate-home")
                if (request.GET.get("set_inst_id")
Dong Zhuang's avatar
Dong Zhuang committed
                        and request.GET.get("referer")):
                    return redirect(request.GET["referer"])

                user_form = UserForm(
Dong Zhuang's avatar
Dong Zhuang committed
                    is_inst_id_locked=is_inst_id_locked(request.user),
                )

    if user_form is None:
Dong Zhuang's avatar
Dong Zhuang committed
        request.user.refresh_from_db()
        user_form = UserForm(
            instance=request.user,
            is_inst_id_locked=is_inst_id_locked(request.user),
        )
    return render(request, "user-profile-form.html", {
Dong Zhuang's avatar
Dong Zhuang committed
        "form": user_form,
        "form_description": _("User Profile"),
Dong Zhuang's avatar
Dong Zhuang committed
        "is_requesting_inst_id": is_requesting_inst_id(),
        "enable_profile_form_js": (
Andreas Klöckner's avatar
Andreas Klöckner committed
            not is_inst_id_locked(request.user)
            and getattr(settings, "RELATE_SHOW_INST_ID_FORM", True))
# {{{ SAML auth backend

# This ticks the 'verified' boxes once we've receive attribute assertions
# through SAML2.

@receiver(saml2_pre_user_save)
def saml2_update_user_hook(sender, instance, attributes, user_modified, **kwargs):
    attr_mapping = getattr(settings, "SAML_ATTRIBUTE_MAPPING", {})

    mapped_attributes = {
            mapped_key: val
            for key, val in attributes.items()
            for mapped_key in attr_mapping.get(key, ())}
    if "institutional_id" in mapped_attributes:
        if not instance.institutional_id_verified:
            instance.institutional_id_verified = True
            mod = True
    if "first_name" in mapped_attributes and "last_name" in mapped_attributes:
        if not instance.name_verified:
            instance.name_verified = True
            mod = True
    if "email" in mapped_attributes:
        from course.constants import user_status
        if instance.status != user_status.active:
            instance.status = user_status.active
            mod = True
# {{{ sign-out

def sign_out_confirmation(request, redirect_field_name=REDIRECT_FIELD_NAME):
    if not request.user.is_authenticated:
        messages.add_message(request, messages.ERROR,
                             _("You've already signed out."))
        return redirect("relate-home")

    redirect_to = request.POST.get(redirect_field_name,
                                   request.GET.get(redirect_field_name, ""))

    next_uri = ""
    if redirect_to:
        next_uri = "?%s=%s" % (redirect_field_name, redirect_to)

    return render(request, "sign-out-confirmation.html",
                  {"next_uri": next_uri})


@never_cache
def sign_out(request, redirect_field_name=REDIRECT_FIELD_NAME):
    if not request.user.is_authenticated:
        messages.add_message(request, messages.ERROR,
                             _("You've already signed out."))
        return redirect("relate-home")
    redirect_to = request.POST.get(redirect_field_name,
                                   request.GET.get(redirect_field_name, ""))
Andreas Klöckner's avatar
Andreas Klöckner committed
    response = None
    if settings.RELATE_SIGN_IN_BY_SAML2_ENABLED:
        from djangosaml2.views import _get_subject_id, logout as saml2_logout
        if _get_subject_id(request.session) is not None:
Andreas Klöckner's avatar
Andreas Klöckner committed
            response = saml2_logout(request)

    auth_logout(request)
Andreas Klöckner's avatar
Andreas Klöckner committed
    if response is not None:
        return response
Andreas Klöckner's avatar
Andreas Klöckner committed
    else:
        return redirect("relate-home")
class APIError(Exception):
    pass


def find_matching_token(
        course_identifier=None, token_id=None, token_hash_str=None,
        now_datetime=None):
Isuru Fernando's avatar
Isuru Fernando committed
    # type: (Text, int, Text, datetime.datetime) -> Optional[AuthenticationToken]
        token = AuthenticationToken.objects.get(
                id=token_id,
                participation__course__identifier=course_identifier)
    except AuthenticationToken.DoesNotExist:
        return None

    from django.contrib.auth.hashers import check_password
    if not check_password(token_hash_str, token.token_hash):
        return None

    if token.revocation_time is not None:
        return None
    if token.valid_until is not None and now_datetime > token.valid_until:
        return None

    return token


class APIBearerTokenBackend(object):
    def authenticate(self, request, course_identifier=None, token_id=None,
            token_hash_str=None, now_datetime=None):
        token = find_matching_token(course_identifier, token_id, token_hash_str,
                now_datetime)
        if token is None:
            return None

        token.last_use_time = now_datetime
        token.save()

        return token.user

    def get_user(self, user_id):
        try:
            return get_user_model().objects.get(pk=user_id)
        except get_user_model().DoesNotExist:
            return None


class APIContext(object):
    def __init__(self, request, token):
        self.request = request

        self.token = token
        self.participation = token.participation
        self.course = self.participation.course

        restrict_to_role = token.restrict_to_participation_role
        if restrict_to_role is not None:
            role_restriction_ok = False

            if restrict_to_role in token.participation.roles.all():
                role_restriction_ok = True

            if not role_restriction_ok and self.participation.has_permission(
                    pperm.impersonate_role, restrict_to_role.identifier):
                role_restriction_ok = True

            if not role_restriction_ok:
                raise PermissionDenied(
                        "API token specifies invalid role restriction")

        self.restrict_to_role = restrict_to_role

    def has_permission(self, perm, argument=None):
Andreas Klöckner's avatar
Andreas Klöckner committed
        # type: (Text, Optional[Text]) -> bool
        if self.restrict_to_role is None:
            return self.participation.has_permission(perm, argument)
        else:
            return self.restrict_to_role.has_permission(perm, argument)


TOKEN_AUTH_DATA_RE = re.compile(r"^(?P<token_id>[0-9]+)_(?P<token_hash>[a-z0-9]+)$")
Isuru Fernando's avatar
Isuru Fernando committed
BASIC_AUTH_DATA_RE = re.compile(
    r"^(?P<username>\w+):(?P<token_id>[0-9]+)_(?P<token_hash>[a-z0-9]+)$")
Isuru Fernando's avatar
Isuru Fernando committed
def auth_course_with_token(method, func, request,
        course_identifier, *args, **kwargs):
    from django.utils.timezone import now
    now_datetime = now()
    try:
        auth_header = request.META.get("HTTP_AUTHORIZATION", None)
        if auth_header is None:
            raise PermissionDenied("No Authorization header provided")
        auth_values = auth_header.split(" ")
        if len(auth_values) != 2:
            raise PermissionDenied("ill-formed Authorization header")
        auth_method, auth_data = auth_values
        if auth_method != method:
            raise PermissionDenied("ill-formed Authorization header")
        if method == "Token":
            match = TOKEN_AUTH_DATA_RE.match(auth_data)
        elif method == "Basic":
            from base64 import b64decode
            import binascii
            try:
                auth_data = b64decode(auth_data.strip()).decode(
                        "utf-8", errors="replace")
            except binascii.Error:
                raise PermissionDenied("ill-formed Authorization header")
            match = BASIC_AUTH_DATA_RE.match(auth_data)

        else:
            assert False
        if match is None:
            raise PermissionDenied("invalid authentication token")

Isuru Fernando's avatar
Isuru Fernando committed
        token_id = int(match.group("token_id"))
        token_hash_str = match.group("token_hash")
Isuru Fernando's avatar
Isuru Fernando committed
        auth_data_dict = dict(course_identifier=course_identifier,
            token_id=token_id, token_hash_str=token_hash_str,
            now_datetime=now_datetime)
Isuru Fernando's avatar
Isuru Fernando committed

        # FIXME: Redundant db roundtrip
        token = find_matching_token(**auth_data_dict)
        if token is None:
            raise PermissionDenied("invalid authentication token")
        from django.contrib.auth import authenticate, login
Isuru Fernando's avatar
Isuru Fernando committed
        user = authenticate(**auth_data_dict)
        assert user is not None

        if method == "Basic" and match.group("username") != user.username:
            raise PermissionDenied("invalid authentication token")

        login(request, user)

        response = func(
                APIContext(request, token),
                course_identifier, *args, **kwargs)

    except PermissionDenied as e:
        if method == "Basic":
            realm = _("Relate direct git access for {}".format(course_identifier))
Isuru Fernando's avatar
Isuru Fernando committed
            response = http.HttpResponse("Forbidden: " + str(e),
                        content_type="text/plain")
            response["WWW-Authenticate"] = 'Basic realm="%s"' % (realm)
            response.status_code = 401
            return response

            return http.HttpResponseForbidden(
                    "403 Forbidden: " + str(e))
        else:
            assert False

    except APIError as e:
        return http.HttpResponseBadRequest(
                "400 Bad Request: " + str(e))

    return response


def with_course_api_auth(method: Text) -> Any:
    def wrapper_with_method(func):
        def wrapper(*args, **kwargs):
            return auth_course_with_token(method, func, *args, **kwargs)
        from functools import update_wrapper
        update_wrapper(wrapper, func)
    return wrapper_with_method

# }}}


# {{{ manage API auth tokens

class AuthenticationTokenForm(StyledModelForm):
    class Meta:
        model = AuthenticationToken
        fields = (
                "restrict_to_participation_role",
                "description",
                "valid_until",
                )

        widgets = {
                "valid_until": DateTimePicker(options={"format": "YYYY-MM-DD"})
                }

    def __init__(self, participation, *args, **kwargs):
        # type: (Participation, *Any, **Any) -> None
        super(AuthenticationTokenForm, self).__init__(*args, **kwargs)
        allowable_role_ids = (
                set(role.id for role in participation.roles.all())
                | set(
                        prole.id
                        for prole in ParticipationRole.objects.filter(
                            course=participation.course)
                        if participation.has_permission(
                            pperm.impersonate_role, prole.identifier))
                )

        self.fields["restrict_to_participation_role"].queryset = (
                ParticipationRole.objects.filter(
                    id__in=list(allowable_role_ids)
                    ))

        self.helper.add_input(Submit("create", _("Create")))


@course_view
def manage_authentication_tokens(pctx):
    # type: (http.HttpRequest) -> http.HttpResponse
    if not request.user.is_authenticated:
        raise PermissionDenied()

ifaint's avatar
ifaint committed
    if not pctx.has_permission(pperm.view_analytics):
        raise PermissionDenied()

    from course.views import get_now_or_fake_time
    now_datetime = get_now_or_fake_time(request)

    if request.method == "POST":
        form = AuthenticationTokenForm(pctx.participation, request.POST)

        revoke_prefix = "revoke_"
        revoke_post_args = [key for key in request.POST if key.startswith("revoke_")]

        if revoke_post_args:
            token_id = int(revoke_post_args[0][len(revoke_prefix):])
            auth_token = get_object_or_404(AuthenticationToken,
                    id=token_id,
                    user=request.user)

            auth_token.revocation_time = now_datetime
            auth_token.save()

            form = AuthenticationTokenForm(pctx.participation)

        elif "create" in request.POST:
            if form.is_valid():
                token = make_sign_in_key(request.user)

                from django.contrib.auth.hashers import make_password
                auth_token = AuthenticationToken(
                        user=pctx.request.user,
                        participation=pctx.participation,
                        restrict_to_participation_role=form.cleaned_data[
                            "restrict_to_participation_role"],
                        description=form.cleaned_data["description"],
                        valid_until=form.cleaned_data["valid_until"],
                        token_hash=make_password(token))
                auth_token.save()

                user_token = "%d_%s" % (auth_token.id, token)

                messages.add_message(request, messages.SUCCESS,
                        _("A new authentication token has been created: %s. "
                            "Please save this token, as you will not be able "
                            "to retrieve it later.")
                        % user_token)
        else:
            messages.add_message(request, messages.ERROR,
                    _("Could not find which button was pressed."))
        form = AuthenticationTokenForm(pctx.participation)

    from django.db.models import Q
    tokens = AuthenticationToken.objects.filter(
            user=request.user,
            participation__course=pctx.course,
            ).filter(
                Q(revocation_time=None)
                | Q(revocation_time__gt=now_datetime - timedelta(weeks=1)))
    return render_course_page(pctx, "course/manage-auth-tokens.html", {
        "form": form,
        "new_token_message": "",
        "tokens": tokens,
        })

# }}}

# vim: foldmethod=marker