Newer
Older
if user_form.has_changed():
user_form.save()
messages.add_message(request, messages.SUCCESS,
_("Profile data updated."))
request.user.refresh_from_db()
else:
messages.add_message(request, messages.INFO,
_("No change was made on your profile."))
if request.GET.get("first_login"):
return redirect("relate-home")
if (request.GET.get("set_inst_id")
return redirect(request.GET["referer"])
user_form = UserForm(
instance=request.user,
is_inst_id_locked=is_inst_id_locked(request.user),
)
request.user.refresh_from_db()
user_form = UserForm(
instance=request.user,
is_inst_id_locked=is_inst_id_locked(request.user),
)
return render(request, "user-profile-form.html", {
"form": user_form,
"form_description": _("User Profile"),
"is_requesting_inst_id": is_requesting_inst_id(),
"enable_profile_form_js": (
not is_inst_id_locked(request.user)
and getattr(settings, "RELATE_SHOW_INST_ID_FORM", True))
# {{{ SAML auth backend
# This ticks the 'verified' boxes once we've receive attribute assertions
# through SAML2.
class RelateSaml2Backend(Saml2Backend):
def get_or_create_user(self,
Andreas Klöckner
committed
user_lookup_key, user_lookup_value, create_unknown_user,
idp_entityid, attributes, attribute_mapping, request):
user, created = super().get_or_create_user(
user_lookup_key, user_lookup_value, create_unknown_user,
idp_entityid, attributes, attribute_mapping, request)
user = self._rl_update_user(user, attributes, attribute_mapping)
return user, created
def _rl_update_user(self, user, attributes, attribute_mapping):
mod = False
mapped_attributes = {
mapped_key: val
for key, val in attributes.items()
for mapped_key in attribute_mapping.get(key, ())}
if "institutional_id" in mapped_attributes:
if not user.institutional_id_verified:
user.institutional_id_verified = True
mod = True
if "first_name" in mapped_attributes and "last_name" in mapped_attributes:
if not user.name_verified:
user.name_verified = True
mod = True
if "email" in mapped_attributes:
from course.constants import user_status
if user.status != user_status.active:
user.status = user_status.active
mod = True
if mod:
user.save()
return user
# }}}
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
# {{{ social auth
def social_set_user_email_verified(backend, details, user=None, *args, **kwargs):
email = details.get("email")
modified = False
if email:
if email != user.email:
user.email = email
modified = True
from course.constants import user_status
if user.status != user_status.active:
user.status = user_status.active
modified = True
if modified:
user.save()
# continue the social auth pipeline
return None
Andreas Klöckner
committed
def social_auth_check_domain_against_blacklist(backend, details, *args, **kwargs):
email = details.get("email")
domain_blacklist = getattr(
settings, "RELATE_SOCIAL_AUTH_BLACKLIST_EMAIL_DOMAINS", {})
if domain_blacklist and email:
domain = email.split("@", 1)[1]
if domain in domain_blacklist:
from social_core.exceptions import SocialAuthBaseException
raise SocialAuthBaseException(domain_blacklist[domain])
# continue the social auth pipeline
return None
Dong Zhuang
committed
def sign_out_confirmation(request, redirect_field_name=REDIRECT_FIELD_NAME):
if not request.user.is_authenticated:
messages.add_message(request, messages.ERROR,
_("You've already signed out."))
return redirect("relate-home")
Dong Zhuang
committed
redirect_to = request.POST.get(redirect_field_name,
request.GET.get(redirect_field_name, ""))
Dong Zhuang
committed
next_uri = ""
if redirect_to:
next_uri = f"?{redirect_field_name}={redirect_to}"
Dong Zhuang
committed
return render(request, "sign-out-confirmation.html",
{"next_uri": next_uri})
Dong Zhuang
committed
def sign_out(request, redirect_field_name=REDIRECT_FIELD_NAME):
if not request.user.is_authenticated:
messages.add_message(request, messages.ERROR,
_("You've already signed out."))
return redirect("relate-home")
Dong Zhuang
committed
redirect_to = request.POST.get(redirect_field_name,
request.GET.get(redirect_field_name, ""))
if settings.RELATE_SIGN_IN_BY_SAML2_ENABLED:
from djangosaml2.views import _get_subject_id
if _get_subject_id(request.session) is not None:
# skip auth_logout below, rely on djangosaml2 to complete logout
return redirect("saml2_logout")
Dong Zhuang
committed
elif redirect_to:
return redirect(redirect_to)
class APIError(Exception):
pass
def find_matching_token(
course_identifier: str | None = None,
token_id: int | None = None,
token_hash_str: str | None = None,
now_datetime: datetime.datetime | None = None
) -> AuthenticationToken | None:
if token_id is None:
return None
token = AuthenticationToken.objects.get(
id=token_id,
participation__course__identifier=course_identifier)
except AuthenticationToken.DoesNotExist:
return None
if token.token_hash is None:
return None
from django.contrib.auth.hashers import check_password
if not check_password(token_hash_str, token.token_hash):
return None
if token.revocation_time is not None:
return None
if token.valid_until is not None:
if now_datetime is None:
return None
if now_datetime > token.valid_until:
return None
class APIBearerTokenBackend:
Andreas Klöckner
committed
def authenticate(self, request, course_identifier=None, token_id=None,
token_hash_str=None, now_datetime=None):
token = find_matching_token(course_identifier, token_id, token_hash_str,
now_datetime)
if token is None:
return None
token.last_use_time = now_datetime
token.save()
return token.user
def get_user(self, user_id):
try:
return get_user_model().objects.get(pk=user_id)
except get_user_model().DoesNotExist:
return None
class APIContext:
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
def __init__(self, request, token):
self.request = request
self.token = token
self.participation = token.participation
self.course = self.participation.course
restrict_to_role = token.restrict_to_participation_role
if restrict_to_role is not None:
role_restriction_ok = False
if restrict_to_role in token.participation.roles.all():
role_restriction_ok = True
if not role_restriction_ok and self.participation.has_permission(
pperm.impersonate_role, restrict_to_role.identifier):
role_restriction_ok = True
if not role_restriction_ok:
raise PermissionDenied(
"API token specifies invalid role restriction")
self.restrict_to_role = restrict_to_role
def has_permission(self, perm: str, argument: str | None = None) -> bool:
if self.restrict_to_role is None:
return self.participation.has_permission(perm, argument)
else:
return self.restrict_to_role.has_permission(perm, argument)
TOKEN_AUTH_DATA_RE = re.compile(r"^(?P<token_id>[0-9]+)_(?P<token_hash>[a-z0-9]+)$")
BASIC_AUTH_DATA_RE = re.compile(
r"^(?P<username>\w+):(?P<token_id>[0-9]+)_(?P<token_hash>[a-z0-9]+)$")
def auth_course_with_token(method, func, request,
course_identifier, *args, **kwargs):
from django.utils.timezone import now
now_datetime = now()
auth_header = request.headers.get("authorization", None)
if auth_header is None:
raise PermissionDenied("No Authorization header provided")
auth_values = auth_header.split(" ")
if len(auth_values) != 2:
raise PermissionDenied("ill-formed Authorization header")
auth_method, auth_data = auth_values
if auth_method != method:
raise PermissionDenied("ill-formed Authorization header")
if method == "Token":
match = TOKEN_AUTH_DATA_RE.match(auth_data)
elif method == "Basic":
import binascii
try:
auth_data = b64decode(auth_data.strip()).decode(
"utf-8", errors="replace")
except binascii.Error:
raise PermissionDenied("ill-formed Authorization header")
match = BASIC_AUTH_DATA_RE.match(auth_data)
else:
if match is None:
raise PermissionDenied("invalid authentication token")
token_id = int(match.group("token_id"))
token_hash_str = match.group("token_hash")
auth_data_dict = {
"course_identifier": course_identifier,
"token_id": token_id,
"token_hash_str": token_hash_str,
"now_datetime": now_datetime}
# FIXME: Redundant db roundtrip
token = find_matching_token(**auth_data_dict)
if token is None:
raise PermissionDenied("invalid authentication token")
from django.contrib.auth import authenticate, login
assert user is not None
if method == "Basic" and match.group("username") != user.username:
raise PermissionDenied("invalid authentication token")
login(request, user)
response = func(
APIContext(request, token),
course_identifier, *args, **kwargs)
except PermissionDenied as e:
if method == "Basic":
realm = _(f"Relate direct git access for {course_identifier}")
response = http.HttpResponse("Forbidden: " + str(e),
content_type="text/plain")
response["WWW-Authenticate"] = f'Basic realm="{realm}"'
response.status_code = 401
return response
elif method == "Token":
return http.HttpResponseForbidden(
"403 Forbidden: " + str(e))
except APIError as e:
return http.HttpResponseBadRequest(
"400 Bad Request: " + str(e))
return response
def with_course_api_auth(method: str) -> Any:
def wrapper(*args, **kwargs):
return auth_course_with_token(method, func, *args, **kwargs)
from functools import update_wrapper
update_wrapper(wrapper, func)
# }}}
# {{{ manage API auth tokens
class AuthenticationTokenForm(StyledModelForm):
class Meta:
model = AuthenticationToken
fields = (
"restrict_to_participation_role",
"description",
"valid_until",
)
widgets = {
"valid_until": HTML5DateTimeInput()
}
def __init__(
self, participation: Participation, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.participation = participation
allowable_role_ids = (
{role.id for role in participation.roles.all()}
| {
prole.id
for prole in ParticipationRole.objects.filter(
course=participation.course)
if participation.has_permission(
pperm.impersonate_role, prole.identifier)}
self.fields["restrict_to_participation_role"].queryset = ( # type:ignore[attr-defined]
ParticipationRole.objects.filter(
id__in=list(allowable_role_ids)
))
self.helper.add_input(Submit("create", _("Create")))
@course_view
def manage_authentication_tokens(pctx: CoursePageContext) -> http.HttpResponse:
request = pctx.request
if not request.user.is_authenticated:
raise PermissionDenied()
if not pctx.has_permission(pperm.view_analytics):
raise PermissionDenied()
assert pctx.participation is not None
from course.views import get_now_or_fake_time
now_datetime = get_now_or_fake_time(request)
if request.method == "POST":
form = AuthenticationTokenForm(pctx.participation, request.POST)
revoke_prefix = "revoke_"
revoke_post_args = [key for key in request.POST if key.startswith("revoke_")]
if revoke_post_args:
token_id = int(revoke_post_args[0][len(revoke_prefix):])
auth_token = get_object_or_404(AuthenticationToken,
id=token_id,
user=request.user)
auth_token.revocation_time = now_datetime
auth_token.save()
form = AuthenticationTokenForm(pctx.participation)
elif "create" in request.POST:
if form.is_valid():
token = make_sign_in_key(request.user)
from django.contrib.auth.hashers import make_password
auth_token = AuthenticationToken(
participation=pctx.participation,
restrict_to_participation_role=form.cleaned_data[
"restrict_to_participation_role"],
description=form.cleaned_data["description"],
valid_until=form.cleaned_data["valid_until"],
token_hash=make_password(token))
auth_token.save()
user_token = "%d_%s" % (auth_token.id, token)
messages.add_message(request, messages.SUCCESS,
_("A new authentication token has been created: %s. "
"Please save this token, as you will not be able "
"to retrieve it later.")
% user_token)
else:
messages.add_message(request, messages.ERROR,
_("Could not find which button was pressed."))
form = AuthenticationTokenForm(pctx.participation)
from datetime import timedelta
tokens = AuthenticationToken.objects.filter(
user=request.user,
participation__course=pctx.course,
).filter(
Q(revocation_time=None)
| Q(revocation_time__gt=now_datetime - timedelta(weeks=1)))
return render_course_page(pctx, "course/manage-auth-tokens.html", {
"form": form,
"new_token_message": "",
"tokens": tokens,
})
# }}}