Newer
Older
if permission not in dict(FLOW_PERMISSION_CHOICES):
raise ValidationError(
string_concat("%(location)s: ",
_("invalid flow permission '%(permission)s'"))
% {"location": location, "permission": permission})
def validate_flow_desc(vctx, location, flow_desc):
location,
flow_desc,
required_attrs=[
("title", str),
("description", "markup"),
("groups", list),
("pages", list),
# deprecated (moved to grading rule)
("max_points", (int, float)),
("max_points_enforced_cap", (int, float)),
if hasattr(flow_desc, "rules"):
validate_flow_rules(vctx, location, flow_desc.rules)
# {{{ check for presence of 'groups' or 'pages'
if (
(not hasattr(flow_desc, "groups") and not hasattr(flow_desc, "pages"))
or (hasattr(flow_desc, "groups") and hasattr(flow_desc, "pages"))):
raise ValidationError(
string_concat("%(location)s: ",
_("must have either 'groups' or 'pages'"))
% {"location": location})
# }}}
if hasattr(flow_desc, "pages"):
from course.content import normalize_flow_desc
flow_desc = normalize_flow_desc(flow_desc)
assert not hasattr(flow_desc, "pages")
assert hasattr(flow_desc, "groups")
# {{{ check for non-emptiness
flow_has_page = False
for i, grp in enumerate(flow_desc.groups):
group_has_page = False
if not hasattr(grp, "pages"):
raise ValidationError(
string_concat(
"%(location)s, ",
_("group %(group_index)d ('%(group_id)s'): "
"'pages' attribute is required"))
% {
"location": location,
"group_index": i+1,
"group_id": grp.id})
if not isinstance(grp.pages, list):
raise ValidationError(
string_concat(
"%(location)s, ",
_("group %(group_index)d ('%(group_id)s'): "
"'pages' is not a list"))
% {
"location": location,
"group_index": i+1,
"group_id": grp.id})
for page in grp.pages:
group_has_page = flow_has_page = True
break
if not group_has_page:
raise ValidationError(
string_concat(
"%(location)s, ",
_("group %(group_index)d ('%(group_id)s'): "
"no pages found"))
"location": location,
"group_index": i+1,
"group_id": grp.id})
raise ValidationError(_("%s: no pages found")
% location)
# }}}
# {{{ check group id uniqueness
group_ids = set()
for grp in flow_desc.groups:
if grp.id in group_ids:
raise ValidationError(
string_concat("%(location)s: ",
_("group id '%(group_id)s' not unique"))
% {"location": location, "group_id": grp.id})
group_ids.add(grp.id)
# }}}
for i, grp in enumerate(flow_desc.groups):
validate_flow_group(vctx, "%s, group %d ('%s')"
% (location, i+1, grp.id),
grp)
validate_markup(vctx, location, flow_desc.description)
if hasattr(flow_desc, "completion_text"):
validate_markup(vctx, location, flow_desc.completion_text)
if hasattr(flow_desc, "notify_on_submit"):
for i, item in enumerate(flow_desc.notify_on_submit):
string_concat(
"%s, ",
_("notify_on_submit: item %d is not a string"))
for attr in ["max_points", "max_points_enforced_cap", "bonus_points"]:
if hasattr(flow_desc, attr):
vctx.add_warning(location,
_("Attribute '%s' is deprecated as part of a flow. "
"Specify it as part of a grading rule instead.")
% attr)
def validate_calendar_desc_struct(vctx, location, events_desc):
location,
events_desc,
required_attrs=[
],
allowed_attrs=[
("event_kinds", Struct),
("events", Struct),
]
)
if hasattr(events_desc, "event_kinds"):
for event_kind_name in events_desc.event_kinds._field_names:
event_kind = getattr(events_desc.event_kinds, event_kind_name)
f"{location}, event kind '{event_kind_name}'",
event_kind,
required_attrs=[
],
allowed_attrs=[
("color", str),
("title", str),
]
)
if hasattr(events_desc, "events"):
for event_name in events_desc.events._field_names:
event_desc = getattr(events_desc.events, event_name)
validate_struct(
vctx,
f"{location}, event '{event_name}'",
event_desc,
required_attrs=[
],
allowed_attrs=[
("color", str),
("title", str),
("description", "markup"),
("show_description_from", datespec_types),
("show_description_until", datespec_types),
if hasattr(event_desc, "show_description_from"):
vctx.encounter_datespec(location, event_desc.show_description_from)
if hasattr(event_desc, "show_description_until"):
vctx.encounter_datespec(location, event_desc.show_description_until)
def get_yaml_from_repo_safely(repo, full_name, commit_sha):
from course.content import get_yaml_from_repo
return get_yaml_from_repo(
repo=repo, full_name=full_name, commit_sha=commit_sha,
cached=False)
from traceback import print_exc
print_exc()
raise ValidationError(
"{fullname}: {err_type}: {err_str}".format(
fullname=full_name,
err_type=tp.__name__,
err_str=str(e)))
def check_attributes_yml(
vctx: ValidationContext,
repo: Repo_ish,
path: str, tree: Any,
access_kinds: list[str]) -> None:
Andreas Klöckner
committed
"""
This function reads the .attributes.yml file and checks
that each item for each header is a string
Example::
# this validates
unenrolled:
- test1.pdf
student:
- test2.pdf
# this does not validate
unenrolled:
- test1.pdf
student:
- test2.pdf
- 42
Andreas Klöckner
committed
"""
from course.content import get_true_repo_and_path
true_repo, path = get_true_repo_and_path(repo, path)
# {{{ analyze attributes file
dummy, attr_blob_sha = tree[ATTRIBUTES_FILENAME.encode()]
except KeyError:
# no .attributes.yml here
pass
# the path root only contains a directory
else:
from relate.utils import dict_to_struct
yaml_data = load_yaml(true_repo[attr_blob_sha].data) # type: ignore
att_yml = dict_to_struct(yaml_data)
if path:
loc = path + "/" + ATTRIBUTES_FILENAME
else:
loc = ATTRIBUTES_FILENAME
Andreas Klöckner
committed
validate_struct(vctx, loc, att_yml,
required_attrs=[],
allowed_attrs=[(role, list) for role in access_kinds])
Andreas Klöckner
committed
if hasattr(att_yml, "public"):
vctx.add_warning(loc,
_("Access class 'public' is deprecated. Use 'unenrolled' "
"instead."))
if hasattr(att_yml, "public") and hasattr(att_yml, "unenrolled"):
raise ValidationError(
_("%s: access classes 'public' and 'unenrolled' may not "
"exist simultaneously.")
% (loc))
Andreas Klöckner
committed
for i, l in enumerate(getattr(att_yml, access_kind)):
Andreas Klöckner
committed
"%s: entry %d in '%s' is not a string"
% (loc, i+1, access_kind))
# }}}
# {{{ analyze gitignore
gitignore_lines: list[str] = []
try:
dummy, gitignore_sha = tree[b".gitignore"]
except KeyError:
# no .gitignore here
pass
except ValueError:
# the path root only contains a directory
pass
else:
gitignore_lines = true_repo[gitignore_sha].data.decode("utf-8").split("\n")
# }}}
entry_name = entry.path.decode("utf-8")
if any(fnmatchcase(entry_name, line) for line in gitignore_lines):
continue
if path:
subpath = path+"/"+entry_name
else:
subpath = entry_name
dummy, blob_sha = tree[entry.path]
subtree = true_repo[blob_sha]
check_attributes_yml(vctx, true_repo, subpath, subtree, access_kinds)
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
# {{{ check whether flow grade identifiers were changed in sketchy ways
def check_grade_identifier_link(
vctx, location, course, flow_id, flow_grade_identifier):
from course.models import GradingOpportunity
for bad_gopp in (
GradingOpportunity.objects
.filter(
course=course,
identifier=flow_grade_identifier)
.exclude(flow_id=flow_id)):
# 0 or 1 trips through this loop because of uniqueness
raise ValidationError(
_(
"{location}: existing grading opportunity with identifier "
"'{grade_identifier}' refers to flow '{other_flow_id}', however "
"flow code in this flow ('{new_flow_id}') specifies the same "
"grade identifier. "
"(Have you renamed the flow? If so, edit the grading "
"opportunity to match.)")
.format(
location=location,
grade_identifier=flow_grade_identifier,
other_flow_id=bad_gopp.flow_id,
new_flow_id=flow_id,
new_grade_identifier=flow_grade_identifier))
# }}}
# {{{ check whether page types were changed
def check_for_page_type_changes(vctx, location, course, flow_id, flow_desc):
from course.content import normalize_flow_desc
n_flow_desc = normalize_flow_desc(flow_desc)
from course.models import FlowPageData
for grp in n_flow_desc.groups: # pragma: no branch
for page_desc in grp.pages: # pragma: no branch
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
fpd_with_mismatched_page_types = list(
FlowPageData.objects
.filter(
flow_session__course=course,
flow_session__flow_id=flow_id,
group_id=grp.id,
page_id=page_desc.id)
.exclude(page_type=None)
.exclude(page_type=page_desc.type)
[0:1])
if fpd_with_mismatched_page_types:
mismatched_fpd, = fpd_with_mismatched_page_types
raise ValidationError(
_("%(loc)s, group '%(group)s', page '%(page)s': "
"page type ('%(type_new)s') differs from "
"type used in database ('%(type_old)s')")
% {"loc": location, "group": grp.id,
"page": page_desc.id,
"type_new": page_desc.type,
"type_old": mismatched_fpd.page_type})
# }}}
def validate_flow_id(vctx: ValidationContext, location: str, flow_id: str) -> None:
from course.constants import FLOW_ID_REGEX
match = re.match("^" + FLOW_ID_REGEX + "$", flow_id)
if match is None:
raise ValidationError(
string_concat("%s: ",
_("invalid flow name. "
"Flow names may only contain (roman) "
"letters, numbers, "
"dashes and underscores."))
% location)
def validate_static_page_name(
vctx: ValidationContext, location: str, page_name: str) -> None:
from course.constants import STATICPAGE_PATH_REGEX
match = re.match("^" + STATICPAGE_PATH_REGEX + "$", page_name)
if match is None:
raise ValidationError(
string_concat("%s: ",
_(
"invalid page name. "
"Page names may only contain "
"alphanumeric characters (any language) "
"and hyphens."
))
% location)
def validate_course_content(repo, course_file, events_file,
Andreas Klöckner
committed
validate_sha, course=None):
repo=repo,
commit_sha=validate_sha,
Andreas Klöckner
committed
course=course)
course_desc = get_yaml_from_repo_safely(repo, course_file,
commit_sha=validate_sha)
validate_staticpage_desc(vctx, course_file, course_desc)
try:
from course.content import get_yaml_from_repo
events_desc = get_yaml_from_repo(repo, events_file,
commit_sha=validate_sha, cached=False)
Andreas Klöckner
committed
if events_file != "events.yml":
vctx.add_warning(
Andreas Klöckner
committed
_("Events file"),
_("Your course repository does not have an events "
"file named '%s'.")
% events_file)
else:
# That's OK--no calendar info.
pass
validate_calendar_desc_struct(vctx, events_file, events_desc)
if vctx.course is not None:
from course.models import (
ParticipationPermission,
ParticipationRolePermission)
access_kinds = frozenset(
ParticipationPermission.objects
.filter(
participation__course=vctx.course,
permission=pperm.access_files_for,
)
.values_list("argument", flat=True)) | frozenset(
ParticipationRolePermission.objects
.filter(
permission=pperm.access_files_for,
)
.values_list("argument", flat=True))
Andreas Klöckner
committed
access_kinds = frozenset(k for k in access_kinds if k is not None)
access_kinds = DEFAULT_ACCESS_KINDS
vctx, repo, "",
get_repo_blob(repo, "", validate_sha),
access_kinds)
try:
flows_tree = get_repo_blob(repo, "media", validate_sha)
except ObjectDoesNotExist:
# That's great--no media directory.
pass
else:
vctx.add_warning(
"Your course repository has a 'media/' directory. "
"Linking to media files using 'media:' is discouraged. "
"Use the 'repo:' and 'repocur:' linkng schemes instead."))
try:
flows_tree = get_repo_blob(repo, "flows", validate_sha)
except ObjectDoesNotExist:
# That's OK--no flows yet.
pass
else:
used_grade_identifiers = set()
for entry in flows_tree.items():
entry_path = entry.path.decode("utf-8")
if not entry_path.endswith(".yml"):
location = entry_path
validate_flow_id(vctx, location, flow_id)
flow_desc = get_yaml_from_repo_safely(repo, location,
validate_flow_desc(vctx, location, flow_desc)
# {{{ check grade_identifier
Andreas Klöckner
committed
flow_grade_identifier = None
Andreas Klöckner
committed
flow_grade_identifier = getattr(
flow_desc.rules, "grade_identifier", None)
if (
flow_grade_identifier is not None
and {flow_grade_identifier} & used_grade_identifiers):
Andreas Klöckner
committed
string_concat("%s: ",
_("flow uses the same grade_identifier "
Andreas Klöckner
committed
"as another flow"))
% location)
used_grade_identifiers.add(flow_grade_identifier)
if (course is not None
and flow_grade_identifier is not None):
check_grade_identifier_link(
vctx, location, course, flow_id, flow_grade_identifier)
Andreas Klöckner
committed
# }}}
if course is not None:
check_for_page_type_changes(
vctx, location, course, flow_id, flow_desc)
# }}}
# {{{ static pages
try:
pages_tree = get_repo_blob(repo, "staticpages", validate_sha)
except ObjectDoesNotExist:
# That's OK--no flows yet.
pass
else:
for entry in pages_tree.items():
entry_path = entry.path.decode("utf-8")
if not entry_path.endswith(".yml"):
continue
page_name = entry_path[:-4]
location = entry_path
validate_static_page_name(vctx, location, page_name)
Andreas Klöckner
committed
location = "staticpages/%s" % entry_path
page_desc = get_yaml_from_repo_safely(repo, location,
commit_sha=validate_sha)
Andreas Klöckner
committed
validate_staticpage_desc(vctx, location, page_desc)
Andreas Klöckner
committed
# {{{ validation script support
class FileSystemFakeRepo: # pragma: no cover
def __init__(self, root):
self.root = root
assert isinstance(self.root, bytes)
def controldir(self):
return self.root
def __getitem__(self, sha):
return sha
def __str__(self):
return "<FAKEREPO:%s>" % self.root
def decode(self):
return self
@property
def tree(self):
return FileSystemFakeRepoTree(self.root)
class FileSystemFakeRepoTreeEntry: # pragma: no cover
class FileSystemFakeRepoTree: # pragma: no cover
def __init__(self, root):
self.root = root
assert isinstance(self.root, bytes)
if not name:
raise KeyError("<empty filename>")
from os.path import join, exists
name = join(self.root, name)
if not exists(name):
from os import stat
from stat import S_ISDIR
stat_result = stat(name)
if S_ISDIR(stat_result.st_mode):
return stat_result.st_mode, FileSystemFakeRepoTree(name)
return stat_result.st_mode, FileSystemFakeRepoFile(name)
def items(self):
import os
return [
FileSystemFakeRepoTreeEntry(
path=n,
mode=os.stat(os.path.join(self.root, n)).st_mode)
for n in os.listdir(self.root)]
class FileSystemFakeRepoFile: # pragma: no cover
def __init__(self, name):
self.name = name
@property
def data(self):
with open(self.name, "rb") as inf:
return inf.read()
def validate_course_on_filesystem(
root, course_file, events_file): # pragma: no cover
fake_repo = FileSystemFakeRepo(root.encode("utf-8"))
warnings = validate_course_content(
course_file, events_file,
Andreas Klöckner
committed
validate_sha=fake_repo, course=None)
print(_("WARNINGS: "))
for w in warnings:
print("***", w.location, w.text)
return bool(warnings)