Skip to content
auth.py 51.1 KiB
Newer Older
Dong Zhuang's avatar
Dong Zhuang committed
            request.GET.get("first_login")
            or (request.GET.get("set_inst_id")
                and request.GET.get("referer")))

    if request.method == "POST":
        if "submit_user" in request.POST:
            user_form = UserForm(
                    request.POST,
                    instance=request.user,
                    is_inst_id_locked=is_inst_id_locked(request.user),
            if user_form.is_valid():
Dong Zhuang's avatar
Dong Zhuang committed
                if user_form.has_changed():
                    user_form.save()
                    messages.add_message(request, messages.SUCCESS,
                            _("Profile data updated."))
                    request.user.refresh_from_db()

                else:
                    messages.add_message(request, messages.INFO,
                            _("No change was made on your profile."))
                if request.GET.get("first_login"):
                    return redirect("relate-home")
                if (request.GET.get("set_inst_id")
Dong Zhuang's avatar
Dong Zhuang committed
                        and request.GET.get("referer")):
                    return redirect(request.GET["referer"])

                user_form = UserForm(
Dong Zhuang's avatar
Dong Zhuang committed
                    is_inst_id_locked=is_inst_id_locked(request.user),
                )

    if user_form is None:
Dong Zhuang's avatar
Dong Zhuang committed
        request.user.refresh_from_db()
        user_form = UserForm(
            instance=request.user,
            is_inst_id_locked=is_inst_id_locked(request.user),
        )
    return render(request, "user-profile-form.html", {
Dong Zhuang's avatar
Dong Zhuang committed
        "form": user_form,
        "form_description": _("User Profile"),
Dong Zhuang's avatar
Dong Zhuang committed
        "is_requesting_inst_id": is_requesting_inst_id(),
        "enable_profile_form_js": (
Andreas Klöckner's avatar
Andreas Klöckner committed
            not is_inst_id_locked(request.user)
            and getattr(settings, "RELATE_SHOW_INST_ID_FORM", True))
# {{{ SAML auth backend

# This ticks the 'verified' boxes once we've receive attribute assertions
# through SAML2.

class RelateSaml2Backend(Saml2Backend):
    def get_or_create_user(self,
            user_lookup_key, user_lookup_value, create_unknown_user,
            idp_entityid, attributes, attribute_mapping, request):
        user, created = super().get_or_create_user(
                user_lookup_key, user_lookup_value, create_unknown_user,
                idp_entityid, attributes, attribute_mapping, request)
        user = self._rl_update_user(user, attributes, attribute_mapping)
    def _rl_update_user(self, user, attributes, attribute_mapping):
        mod = False
        mapped_attributes = {
                mapped_key: val
                for key, val in attributes.items()
                for mapped_key in attribute_mapping.get(key, ())}
        if "institutional_id" in mapped_attributes:
            if not user.institutional_id_verified:
                user.institutional_id_verified = True
                mod = True

        if "first_name" in mapped_attributes and "last_name" in mapped_attributes:
            if not user.name_verified:
                user.name_verified = True
                mod = True

        if "email" in mapped_attributes:
            from course.constants import user_status
            if user.status != user_status.active:
                user.status = user_status.active
                mod = True
Andreas Klöckner's avatar
Andreas Klöckner committed
# {{{ social auth

def social_set_user_email_verified(backend, details, user=None, *args, **kwargs):
    email = details.get("email")

    modified = False

    if email:
        if email != user.email:
            user.email = email
            modified = True

        from course.constants import user_status
        if user.status != user_status.active:
            user.status = user_status.active
            modified = True

    if modified:
        user.save()

    # continue the social auth pipeline
    return None


def social_auth_check_domain_against_blacklist(backend, details, *args, **kwargs):
    email = details.get("email")

    domain_blacklist = getattr(
            settings, "RELATE_SOCIAL_AUTH_BLACKLIST_EMAIL_DOMAINS", {})
    if domain_blacklist and email:
        domain = email.split("@", 1)[1]
        if domain in domain_blacklist:
            from social_core.exceptions import SocialAuthBaseException
            raise SocialAuthBaseException(domain_blacklist[domain])

    # continue the social auth pipeline
    return None

# {{{ sign-out

def sign_out_confirmation(request, redirect_field_name=REDIRECT_FIELD_NAME):
    if not request.user.is_authenticated:
        messages.add_message(request, messages.ERROR,
                             _("You've already signed out."))
        return redirect("relate-home")

    redirect_to = request.POST.get(redirect_field_name,
                                   request.GET.get(redirect_field_name, ""))
        next_uri = f"?{redirect_field_name}={redirect_to}"

    return render(request, "sign-out-confirmation.html",
                  {"next_uri": next_uri})


@never_cache
def sign_out(request, redirect_field_name=REDIRECT_FIELD_NAME):
    if not request.user.is_authenticated:
        messages.add_message(request, messages.ERROR,
                             _("You've already signed out."))
        return redirect("relate-home")
    redirect_to = request.POST.get(redirect_field_name,
                                   request.GET.get(redirect_field_name, ""))
Andreas Klöckner's avatar
Andreas Klöckner committed
    response = None
    if settings.RELATE_SIGN_IN_BY_SAML2_ENABLED:
        from djangosaml2.views import _get_subject_id
        if _get_subject_id(request.session) is not None:
            # skip auth_logout below, rely on djangosaml2 to complete logout
            return redirect("saml2_logout")
Andreas Klöckner's avatar
Andreas Klöckner committed

    auth_logout(request)
Andreas Klöckner's avatar
Andreas Klöckner committed
    if response is not None:
        return response
Andreas Klöckner's avatar
Andreas Klöckner committed
    else:
        return redirect("relate-home")
class APIError(Exception):
    pass


def find_matching_token(
        course_identifier: str | None = None,
        token_id: int | None = None,
        token_hash_str: str | None = None,
        now_datetime: datetime.datetime | None = None
        ) -> AuthenticationToken | None:
        token = AuthenticationToken.objects.get(
                id=token_id,
                participation__course__identifier=course_identifier)
    except AuthenticationToken.DoesNotExist:
        return None

    from django.contrib.auth.hashers import check_password
    if not check_password(token_hash_str, token.token_hash):
        return None

    if token.revocation_time is not None:
        return None
    if token.valid_until is not None and now_datetime > token.valid_until:
    def authenticate(self, request, course_identifier=None, token_id=None,
            token_hash_str=None, now_datetime=None):
        token = find_matching_token(course_identifier, token_id, token_hash_str,
                now_datetime)
        if token is None:
            return None

        token.last_use_time = now_datetime
        token.save()

        return token.user

    def get_user(self, user_id):
        try:
            return get_user_model().objects.get(pk=user_id)
        except get_user_model().DoesNotExist:
            return None


    def __init__(self, request, token):
        self.request = request

        self.token = token
        self.participation = token.participation
        self.course = self.participation.course

        restrict_to_role = token.restrict_to_participation_role
        if restrict_to_role is not None:
            role_restriction_ok = False

            if restrict_to_role in token.participation.roles.all():
                role_restriction_ok = True

            if not role_restriction_ok and self.participation.has_permission(
                    pperm.impersonate_role, restrict_to_role.identifier):
                role_restriction_ok = True

            if not role_restriction_ok:
                raise PermissionDenied(
                        "API token specifies invalid role restriction")

        self.restrict_to_role = restrict_to_role

    def has_permission(self, perm: str, argument: str | None = None) -> bool:
        if self.restrict_to_role is None:
            return self.participation.has_permission(perm, argument)
        else:
            return self.restrict_to_role.has_permission(perm, argument)


TOKEN_AUTH_DATA_RE = re.compile(r"^(?P<token_id>[0-9]+)_(?P<token_hash>[a-z0-9]+)$")
Isuru Fernando's avatar
Isuru Fernando committed
BASIC_AUTH_DATA_RE = re.compile(
    r"^(?P<username>\w+):(?P<token_id>[0-9]+)_(?P<token_hash>[a-z0-9]+)$")
Isuru Fernando's avatar
Isuru Fernando committed
def auth_course_with_token(method, func, request,
        course_identifier, *args, **kwargs):
    from django.utils.timezone import now
    now_datetime = now()
        auth_header = request.headers.get("authorization", None)
        if auth_header is None:
            raise PermissionDenied("No Authorization header provided")
        auth_values = auth_header.split(" ")
        if len(auth_values) != 2:
            raise PermissionDenied("ill-formed Authorization header")
        auth_method, auth_data = auth_values
        if auth_method != method:
            raise PermissionDenied("ill-formed Authorization header")
        if method == "Token":
            match = TOKEN_AUTH_DATA_RE.match(auth_data)
        elif method == "Basic":
            import binascii
Andreas Klöckner's avatar
Andreas Klöckner committed
            from base64 import b64decode
            try:
                auth_data = b64decode(auth_data.strip()).decode(
                        "utf-8", errors="replace")
            except binascii.Error:
                raise PermissionDenied("ill-formed Authorization header")
            match = BASIC_AUTH_DATA_RE.match(auth_data)

        else:
            raise AssertionError()
        if match is None:
            raise PermissionDenied("invalid authentication token")

Isuru Fernando's avatar
Isuru Fernando committed
        token_id = int(match.group("token_id"))
        token_hash_str = match.group("token_hash")
        auth_data_dict = {
                "course_identifier": course_identifier,
                "token_id": token_id,
                "token_hash_str": token_hash_str,
                "now_datetime": now_datetime}
Isuru Fernando's avatar
Isuru Fernando committed

        # FIXME: Redundant db roundtrip
        token = find_matching_token(**auth_data_dict)
        if token is None:
            raise PermissionDenied("invalid authentication token")
        from django.contrib.auth import authenticate, login
Isuru Fernando's avatar
Isuru Fernando committed
        user = authenticate(**auth_data_dict)
        assert user is not None

        if method == "Basic" and match.group("username") != user.username:
            raise PermissionDenied("invalid authentication token")

        login(request, user)

        response = func(
                APIContext(request, token),
                course_identifier, *args, **kwargs)

    except PermissionDenied as e:
        if method == "Basic":
            realm = _(f"Relate direct git access for {course_identifier}")
Isuru Fernando's avatar
Isuru Fernando committed
            response = http.HttpResponse("Forbidden: " + str(e),
                        content_type="text/plain")
            response["WWW-Authenticate"] = 'Basic realm="%s"' % (realm)
            response.status_code = 401
            return response

            return http.HttpResponseForbidden(
                    "403 Forbidden: " + str(e))
            raise AssertionError()

    except APIError as e:
        return http.HttpResponseBadRequest(
                "400 Bad Request: " + str(e))

    return response


def with_course_api_auth(method: str) -> Any:
    def wrapper_with_method(func):
        def wrapper(*args, **kwargs):
            return auth_course_with_token(method, func, *args, **kwargs)
        from functools import update_wrapper
        update_wrapper(wrapper, func)
    return wrapper_with_method

# }}}


# {{{ manage API auth tokens

class AuthenticationTokenForm(StyledModelForm):
    class Meta:
        model = AuthenticationToken
        fields = (
                "restrict_to_participation_role",
                "description",
                "valid_until",
                )

                "valid_until": HTML5DateTimeInput()
    def __init__(
            self, participation: Participation, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
                {role.id for role in participation.roles.all()}
                | {
                        prole.id
                        for prole in ParticipationRole.objects.filter(
                            course=participation.course)
                        if participation.has_permission(
                            pperm.impersonate_role, prole.identifier)}
                )

        self.fields["restrict_to_participation_role"].queryset = (
                ParticipationRole.objects.filter(
                    id__in=list(allowable_role_ids)
                    ))

        self.helper.add_input(Submit("create", _("Create")))


def manage_authentication_tokens(pctx: http.HttpRequest) -> http.HttpResponse:
    if not request.user.is_authenticated:
        raise PermissionDenied()

ifaint's avatar
ifaint committed
    if not pctx.has_permission(pperm.view_analytics):
        raise PermissionDenied()

    from course.views import get_now_or_fake_time
    now_datetime = get_now_or_fake_time(request)

    if request.method == "POST":
        form = AuthenticationTokenForm(pctx.participation, request.POST)

        revoke_prefix = "revoke_"
        revoke_post_args = [key for key in request.POST if key.startswith("revoke_")]

        if revoke_post_args:
            token_id = int(revoke_post_args[0][len(revoke_prefix):])
            auth_token = get_object_or_404(AuthenticationToken,
                    id=token_id,
                    user=request.user)

            auth_token.revocation_time = now_datetime
            auth_token.save()

            form = AuthenticationTokenForm(pctx.participation)

        elif "create" in request.POST:
            if form.is_valid():
                token = make_sign_in_key(request.user)

                from django.contrib.auth.hashers import make_password
                auth_token = AuthenticationToken(
                        user=pctx.request.user,
                        participation=pctx.participation,
                        restrict_to_participation_role=form.cleaned_data[
                            "restrict_to_participation_role"],
                        description=form.cleaned_data["description"],
                        valid_until=form.cleaned_data["valid_until"],
                        token_hash=make_password(token))
                auth_token.save()

                user_token = "%d_%s" % (auth_token.id, token)

                messages.add_message(request, messages.SUCCESS,
                        _("A new authentication token has been created: %s. "
                            "Please save this token, as you will not be able "
                            "to retrieve it later.")
                        % user_token)
        else:
            messages.add_message(request, messages.ERROR,
                    _("Could not find which button was pressed."))
        form = AuthenticationTokenForm(pctx.participation)

    from datetime import timedelta
Andreas Klöckner's avatar
Andreas Klöckner committed

    from django.db.models import Q
    tokens = AuthenticationToken.objects.filter(
            user=request.user,
            participation__course=pctx.course,
            ).filter(
                Q(revocation_time=None)
                | Q(revocation_time__gt=now_datetime - timedelta(weeks=1)))
    return render_course_page(pctx, "course/manage-auth-tokens.html", {
        "form": form,
        "new_token_message": "",
        "tokens": tokens,
        })

# }}}

# vim: foldmethod=marker