Newer
Older
# }}}
# {{{ repo yaml getting
def get_raw_yaml_from_repo(repo, full_name, commit_sha):
Andreas Klöckner
committed
# type: (Repo_ish, Text, bytes) -> Any
"""Return decoded YAML data structure from
the given file in *repo* at *commit_sha*.
:arg commit_sha: A byte string containing the commit hash
from urllib.parse import quote_plus
cache_key = "%RAW%%2".join((
CACHE_KEY_ROOT,
quote_plus(repo.controldir()), quote_plus(full_name), commit_sha.decode(),
))
result = None # type: Optional[Any]
# Memcache is apparently limited to 250 characters.
if len(cache_key) < 240:
result = def_cache.get(cache_key)
allow_tree=False).data)
result = load_yaml(yaml_str) # type: ignore
def_cache.add(cache_key, result, None)
return result
LINE_HAS_INDENTING_TABS_RE = re.compile(r"^\s*\t\s*", re.MULTILINE)
def get_yaml_from_repo(repo, full_name, commit_sha, cached=True,
tolerate_tabs=False):
# type: (Repo_ish, Text, bytes, bool, bool) -> Any
Andreas Klöckner
committed
"""Return decoded, struct-ified YAML data structure from
the given file in *repo* at *commit_sha*.
:arg tolerate_tabs: At one point, Relate accepted tabs
in indentation, but it no longer does. In places where legacy compatibility
matters, you may set *tolerate_tabs* to *True*.
try:
import django.core.cache as cache
except ImproperlyConfigured:
cached = False
else:
from urllib.parse import quote_plus
cache_key = "%%%2".join(
(CACHE_KEY_ROOT,
quote_plus(repo.controldir()), quote_plus(full_name),
commit_sha.decode()))
def_cache = cache.caches["default"]
result = None
# Memcache is apparently limited to 250 characters.
if len(cache_key) < 240:
result = def_cache.get(cache_key)
if result is not None:
return result
yaml_bytestream = get_repo_blob(
repo, full_name, commit_sha, allow_tree=False).data
yaml_text = yaml_bytestream.decode("utf-8")
if not tolerate_tabs and LINE_HAS_INDENTING_TABS_RE.search(yaml_text):
raise ValueError("File uses tabs in indentation. "
"This is not allowed.")
repo, commit_sha, yaml_bytestream)
yaml_data = load_yaml(expanded) # type:ignore
result = dict_to_struct(yaml_data)
if cached:
def_cache.add(cache_key, result, None)
# }}}
# {{{ markup
def _attr_to_string(key, val):
if val is None:
return key
return '%s="%s"' % (key, val)
class TagProcessingHTMLParser(html_parser.HTMLParser):
def __init__(self, out_file, process_tag_func):
self.out_file = out_file
self.process_tag_func = process_tag_func
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
attrs.update(self.process_tag_func(tag, attrs))
self.out_file.write("<%s %s>" % (tag, " ".join(
_attr_to_string(k, v) for k, v in attrs.items())))
def handle_endtag(self, tag):
self.out_file.write("</%s>" % tag)
def handle_startendtag(self, tag, attrs):
attrs = dict(attrs)
attrs.update(self.process_tag_func(tag, attrs))
self.out_file.write("<%s %s/>" % (tag, " ".join(
_attr_to_string(k, v) for k, v in attrs.items())))
def handle_data(self, data):
self.out_file.write(data)
def handle_entityref(self, name):
self.out_file.write("&%s;" % name)
def handle_charref(self, name):
self.out_file.write("&#%s;" % name)
def handle_comment(self, data):
self.out_file.write("<!--%s-->" % data)
def handle_decl(self, decl):
self.out_file.write("<!%s>" % decl)
def handle_pi(self, data):
raise NotImplementedError(
_("I have no idea what a processing instruction is."))
def unknown_decl(self, data):
self.out_file.write("<![%s]>" % data)
class PreserveFragment(object):
def __init__(self, s):
self.s = s
def __init__(self, md, course, commit_sha, reverse_func):
self.commit_sha = commit_sha
def reverse(self, viewname, args):
frag = None
new_args = []
for arg in args:
if isinstance(arg, PreserveFragment):
s = arg.s
if frag_index != -1:
frag = s[frag_index:]
s = s[:frag_index]
new_args.append(s)
else:
new_args.append(arg)
result = self.reverse_func(viewname, args=new_args)
if frag is not None:
result += frag
return result
def get_course_identifier(self):
if self.course is None:
return "bogus-course-identifier"
else:
return self.course.identifier
try:
if url.startswith("course:"):
course_id = url[7:]
if course_id:
return self.reverse("relate-course_page",
args=(course_id,))
else:
return self.reverse("relate-course_page",
args=(self.get_course_identifier(),))
elif url.startswith("flow:"):
flow_id = url[5:]
return self.reverse("relate-view_start_flow",
args=(self.get_course_identifier(), flow_id))
elif url.startswith("staticpage:"):
page_path = url[11:]
return self.reverse("relate-content_page",
args=(
self.get_course_identifier(),
PreserveFragment(page_path)))
elif url.startswith("media:"):
media_path = url[6:]
return self.reverse("relate-get_media",
args=(
self.get_course_identifier(),
PreserveFragment(media_path)))
elif url.startswith("repo:"):
path = url[5:]
return self.reverse("relate-get_repo_file",
args=(
self.get_course_identifier(),
elif url.startswith("repocur:"):
path = url[8:]
return self.reverse("relate-get_current_repo_file",
args=(
self.get_course_identifier(),
elif url.strip() == "calendar:":
return self.reverse("relate-view_calendar",
except NoReverseMatch:
from base64 import b64encode
message = ("Invalid character in RELATE URL: " + url).encode("utf-8")
return "data:text/plain;base64,"+b64encode(message).decode()
def process_tag(self, tag_name, attrs):
changed_attrs = {}
if tag_name == "table" and attrs.get("bootstrap") != "no":
changed_attrs["class"] = "table table-condensed"
if tag_name in ["a", "link"] and "href" in attrs:
new_href = self.process_url(attrs["href"])
if new_href is not None:
changed_attrs["href"] = new_href
elif tag_name == "img" and "src" in attrs:
new_src = self.process_url(attrs["src"])
if new_src is not None:
changed_attrs["src"] = new_src
elif tag_name == "object" and "data" in attrs:
new_data = self.process_url(attrs["data"])
if new_data is not None:
changed_attrs["data"] = new_data
return changed_attrs
def process_etree_element(self, element):
changed_attrs = self.process_tag(element.tag, element.attrib)
for key, val in changed_attrs.items():
element.set(key, val)
def walk_and_process_tree(self, root):
self.process_etree_element(root)
self.walk_and_process_tree(child)
def run(self, root):
self.walk_and_process_tree(root)
# root through and process Markdown's HTML stash (gross!)
for i, (html, safe) in enumerate(self.md.htmlStash.rawHtmlBlocks):
parser = TagProcessingHTMLParser(outf, self.process_tag)
parser.feed(html)
self.md.htmlStash.rawHtmlBlocks[i] = (outf.getvalue(), safe)
class LinkFixerExtension(Extension):
def __init__(self, course, commit_sha, reverse_func):
Andreas Klöckner
committed
# type: (Optional[Course], bytes, Optional[Callable]) -> None
self.course = course
self.commit_sha = commit_sha
def extendMarkdown(self, md, md_globals): # noqa
LinkFixerTreeprocessor(md, self.course, self.commit_sha,
reverse_func=self.reverse_func)
def remove_prefix(prefix, s):
Andreas Klöckner
committed
# type: (Text, Text) -> Text
if s.startswith(prefix):
return s[len(prefix):]
else:
return s
JINJA_PREFIX = "[JINJA]"
def expand_markup(
course, # type: Optional[Course]
repo, # type: Repo_ish
commit_sha, # type: bytes
text, # type: Text
use_jinja=True, # type: bool
jinja_env={}, # type: Dict
):
# type: (...) -> Text
if not isinstance(text, str):
text = str(text)
# {{{ process through Jinja
if use_jinja:
from jinja2 import Environment, StrictUndefined
env = Environment(
loader=GitTemplateLoader(repo, commit_sha),
undefined=StrictUndefined)
template = env.from_string(text)
kwargs = {}
if jinja_env:
kwargs.update(jinja_env)
from course.utils import IpynbJinjaMacro
kwargs[IpynbJinjaMacro.name] = IpynbJinjaMacro(course, repo, commit_sha)
text = template.render(**kwargs)
# }}}
return text
def filter_html_attributes(tag, name, value):
from bleach.sanitizer import ALLOWED_ATTRIBUTES
allowed_attrs = ALLOWED_ATTRIBUTES.get(tag, [])
result = name in allowed_attrs
if tag == "a":
result = (result
or (name == "role" and value == "button")
or (name == "class" and value.startswith("btn btn-")))
elif tag == "img":
result = result or name == "src"
elif tag == "div":
result = result or (name == "class" and value == "well")
elif tag == "i":
result = result or (name == "class" and value.startswith("fa fa-"))
return result
Andreas Klöckner
committed
def markup_to_html(
course, # type: Optional[Course]
repo, # type: Repo_ish
commit_sha, # type: bytes
text, # type: Text
reverse_func=None, # type: Callable
validate_only=False, # type: bool
use_jinja=True, # type: bool
Andreas Klöckner
committed
jinja_env={}, # type: Dict
):
# type: (...) -> Text
disable_codehilite = bool(
getattr(settings,
"RELATE_DISABLE_CODEHILITE_MARKDOWN_EXTENSION", True))
if course is not None and not jinja_env:
try:
import django.core.cache as cache
except ImproperlyConfigured:
cache_key = None
else:
import hashlib
cache_key = ("markup:v8:%s:%d:%s:%s:%s%s"
% (CACHE_KEY_ROOT,
course.id, course.trusted_for_markup, str(commit_sha),
hashlib.md5(text.encode("utf-8")).hexdigest(),
":NOCODEHILITE" if disable_codehilite else ""
))
def_cache = cache.caches["default"]
result = def_cache.get(cache_key)
if result is not None:
if text.lstrip().startswith(JINJA_PREFIX):
text = remove_prefix(JINJA_PREFIX, text.lstrip())
else:
cache_key = None
text = expand_markup(
course, repo, commit_sha, text, use_jinja=use_jinja, jinja_env=jinja_env)
if reverse_func is None:
from django.urls import reverse
reverse_func = reverse
Andreas Klöckner
committed
if validate_only:
Andreas Klöckner
committed
return ""
Andreas Klöckner
committed
from course.mdx_mathjax import MathJaxExtension
from course.utils import NBConvertExtension
extensions = [
LinkFixerExtension(course, commit_sha, reverse_func=reverse_func),
MathJaxExtension(),
"markdown.extensions.extra",
]
if not disable_codehilite:
# Note: no matter whether disable_codehilite, the code in
# the rendered ipython notebook will be highlighted.
# "css_class=highlight" is to ensure that, when codehilite extension
# is enabled, code out side of notebook uses the same html class
# attribute as the default highlight class (i.e., `highlight`)
# used by rendered ipynb notebook cells, Thus we don't need to
# make 2 copies of css for the highlight.
extensions += ["markdown.extensions.codehilite(css_class=highlight)"]
extensions=extensions,
output_format="html5")
if course is None or not course.trusted_for_markup:
import bleach
result = bleach.clean(result,
tags=bleach.ALLOWED_TAGS + [
"div", "span", "p", "img",
"h1", "h2", "h3", "h4", "h5", "h6",
],
attributes=filter_html_attributes)
if cache_key is not None:
def_cache.add(cache_key, result, None)
return result
TITLE_RE = re.compile(r"^\#+\s*(.+)", re.UNICODE)
def extract_title_from_markup(markup_text):
Andreas Klöckner
committed
# type: (Text) -> Optional[Text]
lines = markup_text.split("\n")
for ln in lines[:10]:
match = TITLE_RE.match(ln)
if match is not None:
return match.group(1)
return None
# {{{ datespec processing
DATE_RE = re.compile(r"^([0-9]+)\-([01][0-9])\-([0-3][0-9])$")
TRAILING_NUMERAL_RE = re.compile(r"^(.*)\s+([0-9]+)$")
END_PREFIX = "end:"
class InvalidDatespec(ValueError):
def __init__(self, datespec):
ValueError.__init__(self, str(datespec))
self.datespec = datespec
Andreas Klöckner
committed
class DatespecPostprocessor(object):
@classmethod
def parse(cls, s):
# type: (Text) -> Tuple[Text, Optional[DatespecPostprocessor]]
raise NotImplementedError()
def apply(self, dtm):
# type: (datetime.datetime) -> datetime.datetime
raise NotImplementedError()
AT_TIME_RE = re.compile(r"^(.*)\s*@\s*([0-2]?[0-9])\:([0-9][0-9])\s*$")
Andreas Klöckner
committed
class AtTimePostprocessor(DatespecPostprocessor):
def __init__(self, hour, minute, second=0):
Andreas Klöckner
committed
# type: (int, int, int) -> None
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
self.hour = hour
self.minute = minute
self.second = second
@classmethod
def parse(cls, s):
match = AT_TIME_RE.match(s)
if match is not None:
hour = int(match.group(2))
minute = int(match.group(3))
if not (0 <= hour < 24):
raise InvalidDatespec(s)
if not (0 <= minute < 60):
raise InvalidDatespec(s)
return match.group(1), AtTimePostprocessor(hour, minute)
else:
return s, None
def apply(self, dtm):
from pytz import timezone
server_tz = timezone(settings.TIME_ZONE)
return dtm.astimezone(server_tz).replace(
hour=self.hour,
minute=self.minute,
second=self.second)
PLUS_DELTA_RE = re.compile(r"^(.*)\s*([+-])\s*([0-9]+)\s+"
"(weeks?|days?|hours?|minutes?)$")
Andreas Klöckner
committed
class PlusDeltaPostprocessor(DatespecPostprocessor):
def __init__(self, count, period):
Andreas Klöckner
committed
# type: (int, Text) -> None
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
self.count = count
self.period = period
@classmethod
def parse(cls, s):
match = PLUS_DELTA_RE.match(s)
if match is not None:
count = int(match.group(3))
if match.group(2) == "-":
count = -count
period = match.group(4)
return match.group(1), PlusDeltaPostprocessor(count, period)
else:
return s, None
def apply(self, dtm):
if self.period.startswith("week"):
d = datetime.timedelta(weeks=self.count)
elif self.period.startswith("day"):
d = datetime.timedelta(days=self.count)
elif self.period.startswith("hour"):
d = datetime.timedelta(hours=self.count)
else:
assert self.period.startswith("minute")
d = datetime.timedelta(minutes=self.count)
return dtm + d
DATESPEC_POSTPROCESSORS = [
AtTimePostprocessor,
PlusDeltaPostprocessor,
Andreas Klöckner
committed
] # type: List[Any]
Andreas Klöckner
committed
def parse_date_spec(
Andreas Klöckner
committed
datespec, # type: Union[Text, datetime.date, datetime.datetime]
vctx=None, # type: Optional[ValidationContext]
location=None, # type: Optional[Text]
):
# type: (...) -> datetime.datetime
Andreas Klöckner
committed
orig_datespec = datespec
def localize_if_needed(d):
Andreas Klöckner
committed
# type: (datetime.datetime) -> datetime.datetime
if d.tzinfo is None:
from relate.utils import localize_datetime
return localize_datetime(d)
return d
if isinstance(datespec, datetime.datetime):
return localize_if_needed(datespec)
if isinstance(datespec, datetime.date):
return localize_if_needed(
datetime.datetime.combine(datespec, datetime.time.min))
# {{{ parse postprocessors
Andreas Klöckner
committed
postprocs = [] # type: List[DatespecPostprocessor]
while True:
parsed_one = False
for pp_class in DATESPEC_POSTPROCESSORS:
Andreas Klöckner
committed
datespec_str, postproc = pp_class.parse(datespec_str)
if postproc is not None:
parsed_one = True
Andreas Klöckner
committed
postprocs.insert(0, cast(DatespecPostprocessor, postproc))
break
Andreas Klöckner
committed
datespec_str = datespec_str.strip()
if not parsed_one:
break
# }}}
def apply_postprocs(dtime):
Andreas Klöckner
committed
# type: (datetime.datetime) -> datetime.datetime
for postproc in postprocs:
dtime = postproc.apply(dtime)
return dtime
Andreas Klöckner
committed
match = DATE_RE.match(datespec_str)
Andreas Klöckner
committed
res_date = datetime.date(
int(match.group(1)),
int(match.group(2)),
int(match.group(3)))
result = localize_if_needed(
Andreas Klöckner
committed
datetime.datetime.combine(res_date, datetime.time.min))
Andreas Klöckner
committed
is_end = datespec_str.startswith(END_PREFIX)
Andreas Klöckner
committed
datespec_str = datespec_str[len(END_PREFIX):]
Andreas Klöckner
committed
match = TRAILING_NUMERAL_RE.match(datespec_str)
# event with numeral
Andreas Klöckner
committed
event_kind = match.group(1)
Andreas Klöckner
committed
ordinal = int(match.group(2)) # type: Optional[int]
Andreas Klöckner
committed
else:
# event without numeral
Andreas Klöckner
committed
event_kind = datespec_str
Andreas Klöckner
committed
if vctx is not None:
from course.validation import validate_identifier
validate_identifier(vctx, "%s: event kind" % location, event_kind)
Andreas Klöckner
committed
if course is None:
return now()
from course.models import Event
event_obj = Event.objects.get(
course=course,
kind=event_kind,
ordinal=ordinal)
except ObjectDoesNotExist:
Andreas Klöckner
committed
if vctx is not None:
vctx.add_warning(
location,
_("Unrecognized date/time specification: '%s' "
"(interpreted as 'now'). "
"You should add an event with this name.")
Andreas Klöckner
committed
% orig_datespec)
return now()
if is_end:
if event_obj.end_time is not None:
result = event_obj.end_time
else:
result = event_obj.time
if vctx is not None:
vctx.add_warning(
location,
_("event '%s' has no end time, using start time instead")
% orig_datespec)
else:
result = event_obj.time
return apply_postprocs(result)
# }}}
# {{{ page chunks
Andreas Klöckner
committed
def compute_chunk_weight_and_shown(
course, # type: Course
chunk, # type: ChunkDesc
roles, # type: List[Text]
Andreas Klöckner
committed
now_datetime, # type: datetime.datetime
facilities, # type: FrozenSet[Text]
Andreas Klöckner
committed
):
# type: (...) -> Tuple[float, bool]
if not hasattr(chunk, "rules"):
return 0, True
if hasattr(rule, "if_has_role"):
Andreas Klöckner
committed
if all(role not in rule.if_has_role for role in roles):
if hasattr(rule, "if_after"):
start_date = parse_date_spec(course, rule.if_after)
if now_datetime < start_date:
continue
if hasattr(rule, "if_before"):
end_date = parse_date_spec(course, rule.if_before)
if end_date < now_datetime:
continue
if hasattr(rule, "if_in_facility"):
if rule.if_in_facility not in facilities:
# {{{ deprecated
if hasattr(rule, "roles"): # pragma: no cover # deprecated
Andreas Klöckner
committed
if all(role not in rule.roles for role in roles):
if hasattr(rule, "start"): # pragma: no cover # deprecated
start_date = parse_date_spec(course, rule.start)
if hasattr(rule, "end"): # pragma: no cover # deprecated
end_date = parse_date_spec(course, rule.end)
shown = True
if hasattr(rule, "shown"):
shown = rule.shown
return rule.weight, shown
Andreas Klöckner
committed
def get_processed_page_chunks(
course, # type: Course
repo, # type: Repo_ish
commit_sha, # type: bytes
page_desc, # type: StaticPageDesc
roles, # type: List[Text]
now_datetime, # type: datetime.datetime
facilities, # type: FrozenSet[Text]
Andreas Klöckner
committed
):
# type: (...) -> List[ChunkDesc]
for chunk in page_desc.chunks:
chunk.weight, chunk.shown = \
compute_chunk_weight_and_shown(
Andreas Klöckner
committed
course, chunk, roles, now_datetime,
facilities)
chunk.html_content = markup_to_html(course, repo, commit_sha, chunk.content)
if not hasattr(chunk, "title"):
chunk.title = extract_title_from_markup(chunk.content)
page_desc.chunks.sort(key=lambda chunk: chunk.weight, reverse=True)
return [chunk for chunk in page_desc.chunks
# }}}
# {{{ repo desc getting
def normalize_page_desc(page_desc):
Andreas Klöckner
committed
# type: (StaticPageDesc) -> StaticPageDesc
if hasattr(page_desc, "content"):
content = page_desc.content
from relate.utils import struct_to_dict, Struct
d = struct_to_dict(page_desc)
del d["content"]
d["chunks"] = [Struct({"id": "main", "content": content})]
Andreas Klöckner
committed
return cast(StaticPageDesc, Struct(d))
return page_desc
def get_staticpage_desc(repo, course, commit_sha, filename):
Andreas Klöckner
committed
# type: (Repo_ish, Course, bytes, Text) -> StaticPageDesc
page_desc = get_yaml_from_repo(repo, filename, commit_sha)
page_desc = normalize_page_desc(page_desc)
return page_desc
def get_course_desc(repo, course, commit_sha):
Andreas Klöckner
committed
# type: (Repo_ish, Course, bytes) -> CourseDesc
return cast(
CourseDesc,
get_staticpage_desc(repo, course, commit_sha, course.course_file))
def normalize_flow_desc(flow_desc):
Andreas Klöckner
committed
# type: (FlowDesc) -> FlowDesc
if hasattr(flow_desc, "pages"):
pages = flow_desc.pages
from relate.utils import struct_to_dict, Struct
d = struct_to_dict(flow_desc)
del d["pages"]
d["groups"] = [Struct({"id": "main", "pages": pages})]
Andreas Klöckner
committed
return cast(FlowDesc, Struct(d))
Andreas Klöckner
committed
if hasattr(flow_desc, "rules"):
rules = flow_desc.rules
if not hasattr(rules, "grade_identifier"): # pragma: no cover # deprecated
Andreas Klöckner
committed
# Legacy content with grade_identifier in grading rule,
# move first found grade_identifier up to rules.
Andreas Klöckner
committed
rules.grade_identifier = None
rules.grade_aggregation_strategy = None
Andreas Klöckner
committed
for grule in rules.grading:
if grule.grade_identifier is not None: # type: ignore
rules.grade_identifier = grule.grade_identifier # type: ignore
rules.grade_aggregation_strategy = ( # type: ignore
grule.grade_aggregation_strategy) # type: ignore
Andreas Klöckner
committed
break
def get_flow_desc(repo, course, flow_id, commit_sha, tolerate_tabs=False):
# type: (Repo_ish, Course, Text, bytes, bool) -> FlowDesc
"""
:arg tolerate_tabs: At one point, Relate accepted tabs
in indentation, but it no longer does. In places where legacy
compatibility matters, you may set *tolerate_tabs* to *True*.
"""
Andreas Klöckner
committed
flow_desc = get_yaml_from_repo(repo, "flows/%s.yml" % flow_id, commit_sha,
tolerate_tabs=tolerate_tabs)
flow_desc = normalize_flow_desc(flow_desc)
flow_desc.description_html = markup_to_html(
course, repo, commit_sha, getattr(flow_desc, "description", None))
return flow_desc
def get_flow_page_desc(flow_id, flow_desc, group_id, page_id):
Andreas Klöckner
committed
# type: (Text, FlowDesc, Text, Text) -> FlowPageDesc
if grp.id == group_id:
for page in grp.pages:
if page.id == page_id:
return page
raise ObjectDoesNotExist(
_("page '%(group_id)s/%(page_id)s' in flow '%(flow_id)s'") % {
"group_id": group_id,
"page_id": page_id,
"flow_id": flow_id
# }}}
# {{{ flow page handling
class ClassNotFoundError(RuntimeError):
Andreas Klöckner
committed
# type: (Text) -> type
components = name.split(".")
if len(components) < 2:
# need at least one module plus class name
raise ClassNotFoundError(name)
module_name = ".".join(components[:-1])
try:
mod = __import__(module_name)
except ImportError:
raise ClassNotFoundError(name)
for comp in components[1:]:
try:
mod = getattr(mod, comp)
except AttributeError:
raise ClassNotFoundError(name)
def get_flow_page_class(repo, typename, commit_sha):
Andreas Klöckner
committed
# type: (Repo_ish, Text, bytes) -> type
# look among default page types
import course.page
try:
return getattr(course.page, typename)
except AttributeError:
pass
# try a global dotted-name import
try:
return import_class(typename)
except ClassNotFoundError:
pass
if typename.startswith("repo:"):
stripped_typename = typename[5:]
components = stripped_typename.split(".")
raise ClassNotFoundError(
_("repo page class must conist of two "
"dotted components (invalid: '%s')")
% typename)
module_name = "code/"+module+".py"
module_code = get_repo_blob(repo, module_name, commit_sha,
allow_tree=False).data
Andreas Klöckner
committed
module_dict = {} # type: Dict
exec(compile(module_code, module_name, "exec"), module_dict)
try:
return module_dict[classname]
raise ClassNotFoundError(typename)
else:
raise ClassNotFoundError(typename)
def instantiate_flow_page(location, repo, page_desc, commit_sha):
Andreas Klöckner
committed
# type: (Text, Repo_ish, FlowPageDesc, bytes) -> PageBase
class_ = get_flow_page_class(repo, page_desc.type, commit_sha)
return class_(None, location, page_desc)