Newer
Older
if ext in [".yml", ".yaml"]:
source = process_yaml_for_expansion(source)
def expand_yaml_macros(repo: Repo_ish, commit_sha: bytes, yaml_str: str) -> str:
Andreas Klöckner
committed
yaml_str = yaml_str.decode("utf-8")
loader=YamlBlockEscapingGitTemplateLoader(repo, commit_sha),
undefined_behavior="strict",
auto_escape_callback=lambda fn: False)
# {{{ process explicit [JINJA] tags (deprecated)
def compute_replacement(match): # pragma: no cover # deprecated
return jinja_env.render_str(match.group(1))
yaml_str, count = JINJA_YAML_RE.subn(compute_replacement, yaml_str)
# The file uses explicit [JINJA] tags. Assume that it doesn't
# want anything else processed through YAML.
return yaml_str
# }}}
jinja_str = process_yaml_for_expansion(yaml_str)
yaml_str = jinja_env.render_str(jinja_str)
return yaml_str
# }}}
# {{{ repo yaml getting
def get_raw_yaml_from_repo(
repo: Repo_ish, full_name: str, commit_sha: bytes) -> Any:
"""Return decoded YAML data structure from
the given file in *repo* at *commit_sha*.
:arg commit_sha: A byte string containing the commit hash
from urllib.parse import quote_plus
cache_key = "%RAW%%2".join((
CACHE_KEY_ROOT,
quote_plus(repo.controldir()), quote_plus(full_name), commit_sha.decode(),
))
result: Any | None = None
# Memcache is apparently limited to 250 characters.
if len(cache_key) < 240:
result = def_cache.get(cache_key)
get_repo_blob(repo, full_name, commit_sha).data)
result = load_yaml(yaml_str) # type: ignore
def_cache.add(cache_key, result, None)
return result
LINE_HAS_INDENTING_TABS_RE = re.compile(r"^\s*\t\s*", re.MULTILINE)
def get_yaml_from_repo(
repo: Repo_ish, full_name: str, commit_sha: bytes, cached: bool = True,
tolerate_tabs: bool = False) -> Any:
"""Return decoded, struct-ified YAML data structure from
the given file in *repo* at *commit_sha*.
:arg tolerate_tabs: At one point, Relate accepted tabs
in indentation, but it no longer does. In places where legacy compatibility
matters, you may set *tolerate_tabs* to *True*.
try:
import django.core.cache as cache
except ImproperlyConfigured:
cached = False
else:
from urllib.parse import quote_plus
cache_key = "%%%2".join(
(CACHE_KEY_ROOT,
quote_plus(repo.controldir()), quote_plus(full_name),
commit_sha.decode()))
def_cache = cache.caches["default"]
result = None
# Memcache is apparently limited to 250 characters.
if len(cache_key) < 240:
result = def_cache.get(cache_key)
if result is not None:
return result
yaml_bytestream = get_repo_blob(
repo, full_name, commit_sha).data
yaml_text = yaml_bytestream.decode("utf-8")
if not tolerate_tabs and LINE_HAS_INDENTING_TABS_RE.search(yaml_text):
raise ValueError("File uses tabs in indentation. "
"This is not allowed.")
expanded = expand_yaml_macros(repo, commit_sha, yaml_bytestream)
yaml_data = load_yaml(expanded) # type:ignore
result = dict_to_struct(yaml_data)
if cached:
def_cache.add(cache_key, result, None)
# }}}
# {{{ markup
def _attr_to_string(key, val):
if val is None:
return key
return f"{key}='{val}'"
return f'{key}="{val}"'
class TagProcessingHTMLParser(html_parser.HTMLParser):
def __init__(
self,
out_file,
process_tag_func: Callable[[str, Mapping[str, str]], Mapping[str, str]]
) -> None:
self.out_file = out_file
self.process_tag_func = process_tag_func
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
attrs.update(self.process_tag_func(tag, attrs))
self.out_file.write("<{} {}>".format(tag, " ".join(
_attr_to_string(k, v) for k, v in attrs.items())))
def handle_startendtag(self, tag, attrs):
attrs = dict(attrs)
attrs.update(self.process_tag_func(tag, attrs))
self.out_file.write("<{} {}/>".format(tag, " ".join(
_attr_to_string(k, v) for k, v in attrs.items())))
def handle_data(self, data):
self.out_file.write(data)
def handle_entityref(self, name):
self.out_file.write(f"<!--{data}-->")
raise NotImplementedError(
_("I have no idea what a processing instruction is."))
self.out_file.write(f"<![{data}]>")
class PreserveFragment:
def __init__(self, s: str) -> None:
def __init__(self, md, course, commit_sha, reverse_func):
self.commit_sha = commit_sha
def reverse(self, viewname: str, args: tuple[Any, ...]) -> str:
frag = None
new_args = []
for arg in args:
if isinstance(arg, PreserveFragment):
s = arg.s
if frag_index != -1:
frag = s[frag_index:]
s = s[:frag_index]
new_args.append(s)
else:
new_args.append(arg)
result = self.reverse_func(viewname, args=new_args)
if frag is not None:
result += frag
return result
def get_course_identifier(self) -> str:
if self.course is None:
return "bogus-course-identifier"
else:
return self.course.identifier
def process_url(self, url: str) -> str | None:
try:
if url.startswith("course:"):
course_id = url[7:]
if course_id:
return self.reverse("relate-course_page",
args=(course_id,))
else:
return self.reverse("relate-course_page",
args=(self.get_course_identifier(),))
elif url.startswith("flow:"):
flow_id = url[5:]
return self.reverse("relate-view_start_flow",
args=(self.get_course_identifier(), flow_id))
elif url.startswith("staticpage:"):
page_path = url[11:]
return self.reverse("relate-content_page",
args=(
self.get_course_identifier(),
PreserveFragment(page_path)))
elif url.startswith("media:"):
media_path = url[6:]
return self.reverse("relate-get_media",
args=(
self.get_course_identifier(),
PreserveFragment(media_path)))
elif url.startswith("repo:"):
path = url[5:]
return self.reverse("relate-get_repo_file",
args=(
self.get_course_identifier(),
elif url.startswith("repocur:"):
path = url[8:]
return self.reverse("relate-get_current_repo_file",
args=(
self.get_course_identifier(),
elif url.strip() == "calendar:":
return self.reverse("relate-view_calendar",
except NoReverseMatch:
from base64 import b64encode
message = ("Invalid character in RELATE URL: " + url).encode("utf-8")
return "data:text/plain;base64,"+b64encode(message).decode()
def process_tag(self, tag_name: str, attrs: Mapping[str, str]) -> Mapping[str, str]:
if tag_name == "table" and attrs.get("bootstrap") != "no":
changed_attrs["class"] = "table table-condensed"
if tag_name in ["a", "link"] and "href" in attrs:
new_href = self.process_url(attrs["href"])
if new_href is not None:
changed_attrs["href"] = new_href
elif tag_name == "img" and "src" in attrs:
new_src = self.process_url(attrs["src"])
if new_src is not None:
changed_attrs["src"] = new_src
elif tag_name == "object" and "data" in attrs:
new_data = self.process_url(attrs["data"])
if new_data is not None:
changed_attrs["data"] = new_data
def process_etree_element(self, element: Element) -> None:
changed_attrs = self.process_tag(element.tag, element.attrib)
for key, val in changed_attrs.items():
def walk_and_process_tree(self, root: Element) -> None:
def run(self, root: Element) -> None:
self.walk_and_process_tree(root)
# root through and process Markdown's HTML stash (gross!)
for i, html in enumerate(self.md.htmlStash.rawHtmlBlocks):
parser = TagProcessingHTMLParser(outf, self.process_tag)
# According to
# https://github.com/python/typeshed/blob/61ba4de28f1469d6a642c983d5a7674479c12444/stubs/Markdown/markdown/util.pyi#L44
# this should not happen, but... *shrug*
if isinstance(html, Element):
html = tostring(html).decode("utf-8")
self.md.htmlStash.rawHtmlBlocks[i] = outf.getvalue()
class LinkFixerExtension(Extension):
def __init__(
self, course: Course | None,
commit_sha: bytes, reverse_func: Callable | None) -> None:
self.course = course
self.commit_sha = commit_sha
def extendMarkdown(self, md): # noqa
md.treeprocessors.register(
LinkFixerTreeprocessor(md, self.course, self.commit_sha,
reverse_func=self.reverse_func),
"relate_link_fixer", 0)
def remove_prefix(prefix: str, s: str) -> str:
if s.startswith(prefix):
return s[len(prefix):]
else:
return s
JINJA_PREFIX = "[JINJA]"
course: Course | None,
repo: Repo_ish,
commit_sha: bytes,
text: str,
use_jinja: bool = True,
jinja_env: dict | None = None,
) -> str:
if jinja_env is None:
jinja_env = {}
if not isinstance(text, str):
text = str(text)
# {{{ process through Jinja
if use_jinja:
env = Environment(
loader=GitTemplateLoader(repo, commit_sha),
def render_notebook_cells(*args, **kwargs):
return "[The ability to render notebooks was removed.]"
env.add_function("render_notebook_cells", render_notebook_cells)
text = env.render_str(text, **jinja_env)
# }}}
return text
def filter_html_attributes(tag, name, value):
from bleach.sanitizer import ALLOWED_ATTRIBUTES
allowed_attrs = ALLOWED_ATTRIBUTES.get(tag, [])
result = name in allowed_attrs
if tag == "a":
result = (result
or (name == "role" and value == "button")
or (name == "class" and value.startswith("btn btn-")))
elif tag == "img":
result = result or name == "src"
elif tag == "div":
result = result or (name == "class" and value == "well")
elif tag == "i":
result = result or (name == "class" and value.startswith("bi bi-"))
elif tag == "table":
result = (result or (name == "class") or (name == "bootstrap"))
return result
Andreas Klöckner
committed
def markup_to_html(
course: Course | None,
repo: Repo_ish,
commit_sha: bytes,
text: str,
reverse_func: Callable | None = None,
validate_only: bool = False,
use_jinja: bool = True,
jinja_env: dict | None = None,
) -> str:
if jinja_env is None:
jinja_env = {}
disable_codehilite = bool(
getattr(settings,
"RELATE_DISABLE_CODEHILITE_MARKDOWN_EXTENSION", True))
if course is not None and not jinja_env:
try:
import django.core.cache as cache
except ImproperlyConfigured:
cache_key = None
else:
import hashlib
cache_key = ("markup:v9:%s:%d:%s:%s:%s%s"
% (CACHE_KEY_ROOT,
course.id, course.trusted_for_markup, str(commit_sha),
hashlib.md5(text.encode("utf-8")).hexdigest(),
":NOCODEHILITE" if disable_codehilite else ""
))
def_cache = cache.caches["default"]
result = def_cache.get(cache_key)
if result is not None:
if text.lstrip().startswith(JINJA_PREFIX):
text = remove_prefix(JINJA_PREFIX, text.lstrip())
else:
cache_key = None
text = expand_markup(
course, repo, commit_sha, text, use_jinja=use_jinja, jinja_env=jinja_env)
if reverse_func is None:
from django.urls import reverse
reverse_func = reverse
Andreas Klöckner
committed
if validate_only:
Andreas Klöckner
committed
return ""
Andreas Klöckner
committed
from course.mdx_mathjax import MathJaxExtension
extensions: list[markdown.Extension | str] = [
LinkFixerExtension(course, commit_sha, reverse_func=reverse_func),
MathJaxExtension(),
"markdown.extensions.extra",
]
extensions=extensions,
if course is None or not course.trusted_for_markup:
import bleach
result = bleach.clean(result,
tags=[*bleach.ALLOWED_TAGS, "div", "span", "p", "img",
"h1", "h2", "h3", "h4", "h5", "h6",
"pre", "details", "summary", "thead", "tbody"],
attributes=filter_html_attributes)
result = f"<div class='relate-markup'>{result}</div>"
if cache_key is not None:
def_cache.add(cache_key, result, None)
return result
TITLE_RE = re.compile(r"^\#+\s*(.+)", re.UNICODE)
def extract_title_from_markup(markup_text: str) -> str | None:
lines = markup_text.split("\n")
for ln in lines[:10]:
match = TITLE_RE.match(ln)
if match is not None:
return match.group(1)
return None
# {{{ datespec processing
DATE_RE = re.compile(r"^([0-9]+)\-([01][0-9])\-([0-3][0-9])$")
TRAILING_NUMERAL_RE = re.compile(r"^(.*)\s+([0-9]+)$")
END_PREFIX = "end:"
class InvalidDatespec(ValueError):
def __init__(self, datespec):
ValueError.__init__(self, str(datespec))
self.datespec = datespec
class DatespecPostprocessor:
Andreas Klöckner
committed
@classmethod
def parse(cls, s: str) -> tuple[str, DatespecPostprocessor | None]:
Andreas Klöckner
committed
raise NotImplementedError()
def apply(self, dtm: datetime.datetime) -> datetime.datetime:
Andreas Klöckner
committed
raise NotImplementedError()
AT_TIME_RE = re.compile(r"^(.*)\s*@\s*([0-2]?[0-9])\:([0-9][0-9])\s*$")
Andreas Klöckner
committed
class AtTimePostprocessor(DatespecPostprocessor):
def __init__(self, hour: int, minute: int, second: int = 0) -> None:
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
self.hour = hour
self.minute = minute
self.second = second
@classmethod
def parse(cls, s):
match = AT_TIME_RE.match(s)
if match is not None:
hour = int(match.group(2))
minute = int(match.group(3))
if not (0 <= hour < 24):
raise InvalidDatespec(s)
if not (0 <= minute < 60):
raise InvalidDatespec(s)
return match.group(1), AtTimePostprocessor(hour, minute)
else:
return s, None
def apply(self, dtm: datetime.datetime) -> datetime.datetime:
from zoneinfo import ZoneInfo
server_tz = ZoneInfo(settings.TIME_ZONE)
return dtm.astimezone(server_tz).replace(
hour=self.hour,
minute=self.minute,
second=self.second)
PLUS_DELTA_RE = re.compile(r"^(.*)\s*([+-])\s*([0-9]+)\s+"
r"(weeks?|days?|hours?|minutes?)$")
Andreas Klöckner
committed
class PlusDeltaPostprocessor(DatespecPostprocessor):
def __init__(self, count: int, period: str) -> None:
Andreas Klöckner
committed
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
self.count = count
self.period = period
@classmethod
def parse(cls, s):
match = PLUS_DELTA_RE.match(s)
if match is not None:
count = int(match.group(3))
if match.group(2) == "-":
count = -count
period = match.group(4)
return match.group(1), PlusDeltaPostprocessor(count, period)
else:
return s, None
def apply(self, dtm):
if self.period.startswith("week"):
d = datetime.timedelta(weeks=self.count)
elif self.period.startswith("day"):
d = datetime.timedelta(days=self.count)
elif self.period.startswith("hour"):
d = datetime.timedelta(hours=self.count)
else:
assert self.period.startswith("minute")
d = datetime.timedelta(minutes=self.count)
return dtm + d
DATESPEC_POSTPROCESSORS: list[Any] = [
AtTimePostprocessor,
PlusDeltaPostprocessor,
Andreas Klöckner
committed
Andreas Klöckner
committed
def parse_date_spec(
course: Course | None,
datespec: str | datetime.date | datetime.datetime,
vctx: ValidationContext | None = None,
location: str | None = None,
) -> datetime.datetime:
Andreas Klöckner
committed
orig_datespec = datespec
def localize_if_needed(d: datetime.datetime) -> datetime.datetime:
if d.tzinfo is None:
from relate.utils import localize_datetime
return localize_datetime(d)
return d
if isinstance(datespec, datetime.datetime):
return localize_if_needed(datespec)
if isinstance(datespec, datetime.date):
return localize_if_needed(
datetime.datetime.combine(datespec, datetime.time.min))
datespec_str = cast(str, datespec).strip()
# {{{ parse postprocessors
postprocs: list[DatespecPostprocessor] = []
while True:
parsed_one = False
for pp_class in DATESPEC_POSTPROCESSORS:
Andreas Klöckner
committed
datespec_str, postproc = pp_class.parse(datespec_str)
if postproc is not None:
parsed_one = True
Andreas Klöckner
committed
postprocs.insert(0, cast(DatespecPostprocessor, postproc))
break
Andreas Klöckner
committed
datespec_str = datespec_str.strip()
if not parsed_one:
break
# }}}
def apply_postprocs(dtime: datetime.datetime) -> datetime.datetime:
for postproc in postprocs:
dtime = postproc.apply(dtime)
return dtime
Andreas Klöckner
committed
match = DATE_RE.match(datespec_str)
Andreas Klöckner
committed
res_date = datetime.date(
int(match.group(1)),
int(match.group(2)),
int(match.group(3)))
result = localize_if_needed(
Andreas Klöckner
committed
datetime.datetime.combine(res_date, datetime.time.min))
Andreas Klöckner
committed
is_end = datespec_str.startswith(END_PREFIX)
Andreas Klöckner
committed
datespec_str = datespec_str[len(END_PREFIX):]
Andreas Klöckner
committed
match = TRAILING_NUMERAL_RE.match(datespec_str)
# event with numeral
Andreas Klöckner
committed
event_kind = match.group(1)
ordinal: int | None = int(match.group(2))
Andreas Klöckner
committed
else:
# event without numeral
Andreas Klöckner
committed
event_kind = datespec_str
Andreas Klöckner
committed
if vctx is not None:
from course.validation import validate_identifier
validate_identifier(vctx, f"{location}: event kind", event_kind)
Andreas Klöckner
committed
if course is None:
return now()
from course.models import Event
event_obj = Event.objects.get(
course=course,
kind=event_kind,
ordinal=ordinal)
except ObjectDoesNotExist:
Andreas Klöckner
committed
if vctx is not None:
vctx.add_warning(
location,
_("Unrecognized date/time specification: '%s' "
"(interpreted as 'now'). "
"You should add an event with this name.")
Andreas Klöckner
committed
% orig_datespec)
return now()
if is_end:
if event_obj.end_time is not None:
result = event_obj.end_time
else:
result = event_obj.time
if vctx is not None:
vctx.add_warning(
location,
_("event '%s' has no end time, using start time instead")
% orig_datespec)
else:
result = event_obj.time
return apply_postprocs(result)
# }}}
# {{{ page chunks
Andreas Klöckner
committed
def compute_chunk_weight_and_shown(
course: Course,
chunk: ChunkDesc,
roles: list[str],
now_datetime: datetime.datetime,
) -> tuple[float, bool]:
if not hasattr(chunk, "rules"):
return 0, True
if hasattr(rule, "if_has_role"):
Andreas Klöckner
committed
if all(role not in rule.if_has_role for role in roles):
if hasattr(rule, "if_after"):
start_date = parse_date_spec(course, rule.if_after)
if now_datetime < start_date:
continue
if hasattr(rule, "if_before"):
end_date = parse_date_spec(course, rule.if_before)
if end_date < now_datetime:
continue
if hasattr(rule, "if_in_facility"):
if rule.if_in_facility not in facilities:
# {{{ deprecated
if hasattr(rule, "roles"): # pragma: no cover # deprecated
Andreas Klöckner
committed
if all(role not in rule.roles for role in roles):
if hasattr(rule, "start"): # pragma: no cover # deprecated
start_date = parse_date_spec(course, rule.start)
if hasattr(rule, "end"): # pragma: no cover # deprecated
end_date = parse_date_spec(course, rule.end)
shown = True
if hasattr(rule, "shown"):
shown = rule.shown
return rule.weight, shown
Andreas Klöckner
committed
def get_processed_page_chunks(
course: Course,
repo: Repo_ish,
commit_sha: bytes,
page_desc: StaticPageDesc,
roles: list[str],
now_datetime: datetime.datetime,
) -> list[ChunkDesc]:
for chunk in page_desc.chunks:
chunk.weight, chunk.shown = \
compute_chunk_weight_and_shown(
Andreas Klöckner
committed
course, chunk, roles, now_datetime,
facilities)
chunk.html_content = markup_to_html(course, repo, commit_sha, chunk.content)
if not hasattr(chunk, "title"):
chunk.title = extract_title_from_markup(chunk.content)
page_desc.chunks.sort(key=lambda chunk: chunk.weight, reverse=True)
return [chunk for chunk in page_desc.chunks
# }}}
# {{{ repo desc getting
def normalize_page_desc(page_desc: StaticPageDesc) -> StaticPageDesc:
if hasattr(page_desc, "content"):
content = page_desc.content
d = struct_to_dict(page_desc)
del d["content"]
d["chunks"] = [Struct({"id": "main", "content": content})]
Andreas Klöckner
committed
return cast(StaticPageDesc, Struct(d))
return page_desc
def get_staticpage_desc(
repo: Repo_ish, course: Course, commit_sha: bytes, filename: str
) -> StaticPageDesc:
Andreas Klöckner
committed
page_desc = get_yaml_from_repo(repo, filename, commit_sha)
page_desc = normalize_page_desc(page_desc)
return page_desc
def get_course_desc(repo: Repo_ish, course: Course, commit_sha: bytes) -> CourseDesc:
Andreas Klöckner
committed
return cast(
CourseDesc,
get_staticpage_desc(repo, course, commit_sha, course.course_file))
def normalize_flow_desc(flow_desc: FlowDesc) -> FlowDesc:
Andreas Klöckner
committed
if hasattr(flow_desc, "pages"):
pages = flow_desc.pages
d = struct_to_dict(flow_desc)
del d["pages"]
d["groups"] = [Struct({"id": "main", "pages": pages})]
Andreas Klöckner
committed
return cast(FlowDesc, Struct(d))
Andreas Klöckner
committed
if hasattr(flow_desc, "rules"):
rules = flow_desc.rules
if not hasattr(rules, "grade_identifier"): # pragma: no cover # deprecated
Andreas Klöckner
committed
# Legacy content with grade_identifier in grading rule,
# move first found grade_identifier up to rules.
Andreas Klöckner
committed
rules.grade_identifier = None
rules.grade_aggregation_strategy = None
Andreas Klöckner
committed
for grule in rules.grading:
if grule.grade_identifier is not None: # type: ignore
rules.grade_identifier = grule.grade_identifier # type: ignore
rules.grade_aggregation_strategy = ( # type: ignore
grule.grade_aggregation_strategy) # type: ignore
Andreas Klöckner
committed
break
def get_flow_desc(
repo: Repo_ish, course: Course, flow_id: str,
commit_sha: bytes, tolerate_tabs: bool = False) -> FlowDesc:
"""
:arg tolerate_tabs: At one point, Relate accepted tabs
in indentation, but it no longer does. In places where legacy
compatibility matters, you may set *tolerate_tabs* to *True*.
"""
Andreas Klöckner
committed
flow_desc = get_yaml_from_repo(repo, f"flows/{flow_id}.yml", commit_sha,
tolerate_tabs=tolerate_tabs)
flow_desc = normalize_flow_desc(flow_desc)
def get_flow_page_desc(flow_id: str, flow_desc: FlowDesc,
group_id: str, page_id: str) -> FlowPageDesc:
if grp.id == group_id:
for page in grp.pages:
if page.id == page_id:
return page
raise ObjectDoesNotExist(
_("page '%(group_id)s/%(page_id)s' in flow '%(flow_id)s'") % {
"group_id": group_id,
"page_id": page_id,
"flow_id": flow_id
# }}}
# {{{ flow page handling
class ClassNotFoundError(RuntimeError):
def import_class(name: str) -> type:
components = name.split(".")
if len(components) < 2:
# need at least one module plus class name
raise ClassNotFoundError(name)
from importlib import import_module
mod_components = len(components) - 1
while mod_components:
module_name = ".".join(components[:mod_components])
mod = import_module(module_name)
except ImportError:
mod_components -= 1
continue
sym = mod
for cls_comp in components[mod_components:]:
try:
sym = getattr(sym, cls_comp)
except AttributeError:
raise ClassNotFoundError(name)
if isinstance(sym, type):
return sym
else:
raise ClassNotFoundError(f"'{name}' does not name a type")
def get_flow_page_class(repo: Repo_ish, typename: str, commit_sha: bytes) -> type:
Andreas Klöckner
committed
# look among default page types
import course.page
try:
return getattr(course.page, typename)
except AttributeError:
pass
# try a global dotted-name import
try:
return import_class(typename)
except ClassNotFoundError:
pass