Newer
Older
def extract_title_from_markup(markup_text):
Andreas Klöckner
committed
# type: (Text) -> Optional[Text]
lines = markup_text.split("\n")
for ln in lines[:10]:
match = TITLE_RE.match(ln)
if match is not None:
return match.group(1)
return None
# {{{ datespec processing
DATE_RE = re.compile(r"^([0-9]+)\-([01][0-9])\-([0-3][0-9])$")
TRAILING_NUMERAL_RE = re.compile(r"^(.*)\s+([0-9]+)$")
END_PREFIX = "end:"
class InvalidDatespec(ValueError):
def __init__(self, datespec):
ValueError.__init__(self, str(datespec))
self.datespec = datespec
Andreas Klöckner
committed
class DatespecPostprocessor(object):
@classmethod
def parse(cls, s):
# type: (Text) -> Tuple[Text, Optional[DatespecPostprocessor]]
raise NotImplementedError()
def apply(self, dtm):
# type: (datetime.datetime) -> datetime.datetime
raise NotImplementedError()
AT_TIME_RE = re.compile(r"^(.*)\s*@\s*([0-2]?[0-9])\:([0-9][0-9])\s*$")
Andreas Klöckner
committed
class AtTimePostprocessor(DatespecPostprocessor):
def __init__(self, hour, minute, second=0):
Andreas Klöckner
committed
# type: (int, int, int) -> None
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
self.hour = hour
self.minute = minute
self.second = second
@classmethod
def parse(cls, s):
match = AT_TIME_RE.match(s)
if match is not None:
hour = int(match.group(2))
minute = int(match.group(3))
if not (0 <= hour < 24):
raise InvalidDatespec(s)
if not (0 <= minute < 60):
raise InvalidDatespec(s)
return match.group(1), AtTimePostprocessor(hour, minute)
else:
return s, None
def apply(self, dtm):
from pytz import timezone
server_tz = timezone(settings.TIME_ZONE)
return dtm.astimezone(server_tz).replace(
hour=self.hour,
minute=self.minute,
second=self.second)
PLUS_DELTA_RE = re.compile(r"^(.*)\s*([+-])\s*([0-9]+)\s+"
"(weeks?|days?|hours?|minutes?)$")
Andreas Klöckner
committed
class PlusDeltaPostprocessor(DatespecPostprocessor):
def __init__(self, count, period):
Andreas Klöckner
committed
# type: (int, Text) -> None
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
self.count = count
self.period = period
@classmethod
def parse(cls, s):
match = PLUS_DELTA_RE.match(s)
if match is not None:
count = int(match.group(3))
if match.group(2) == "-":
count = -count
period = match.group(4)
return match.group(1), PlusDeltaPostprocessor(count, period)
else:
return s, None
def apply(self, dtm):
if self.period.startswith("week"):
d = datetime.timedelta(weeks=self.count)
elif self.period.startswith("day"):
d = datetime.timedelta(days=self.count)
elif self.period.startswith("hour"):
d = datetime.timedelta(hours=self.count)
elif self.period.startswith("minute"):
d = datetime.timedelta(minutes=self.count)
else:
raise InvalidDatespec(_("invalid period: %s" % self.period))
return dtm + d
DATESPEC_POSTPROCESSORS = [
AtTimePostprocessor,
PlusDeltaPostprocessor,
Andreas Klöckner
committed
] # type: List[Any]
Andreas Klöckner
committed
def parse_date_spec(
Andreas Klöckner
committed
datespec, # type: Union[Text, datetime.date, datetime.datetime]
vctx=None, # type: Optional[ValidationContext]
location=None, # type: Optional[Text]
):
# type: (...) -> datetime.datetime
Andreas Klöckner
committed
orig_datespec = datespec
def localize_if_needed(d):
Andreas Klöckner
committed
# type: (datetime.datetime) -> datetime.datetime
if d.tzinfo is None:
from relate.utils import localize_datetime
return localize_datetime(d)
return d
if isinstance(datespec, datetime.datetime):
return localize_if_needed(datespec)
if isinstance(datespec, datetime.date):
return localize_if_needed(
datetime.datetime.combine(datespec, datetime.time.min))
try:
from typing import Text
except ImportError:
Text = None # noqa
datespec_str = cast(Text, datespec).strip() # type: ignore
# {{{ parse postprocessors
Andreas Klöckner
committed
postprocs = [] # type: List[DatespecPostprocessor]
while True:
parsed_one = False
for pp_class in DATESPEC_POSTPROCESSORS:
Andreas Klöckner
committed
datespec_str, postproc = pp_class.parse(datespec_str)
if postproc is not None:
parsed_one = True
Andreas Klöckner
committed
postprocs.insert(0, cast(DatespecPostprocessor, postproc))
break
Andreas Klöckner
committed
datespec_str = datespec_str.strip()
if not parsed_one:
break
# }}}
def apply_postprocs(dtime):
Andreas Klöckner
committed
# type: (datetime.datetime) -> datetime.datetime
for postproc in postprocs:
dtime = postproc.apply(dtime)
return dtime
Andreas Klöckner
committed
match = DATE_RE.match(datespec_str)
Andreas Klöckner
committed
res_date = datetime.date(
int(match.group(1)),
int(match.group(2)),
int(match.group(3)))
result = localize_if_needed(
Andreas Klöckner
committed
datetime.datetime.combine(res_date, datetime.time.min))
Andreas Klöckner
committed
is_end = datespec_str.startswith(END_PREFIX)
Andreas Klöckner
committed
datespec_str = datespec_str[len(END_PREFIX):]
Andreas Klöckner
committed
match = TRAILING_NUMERAL_RE.match(datespec_str)
# event with numeral
Andreas Klöckner
committed
event_kind = match.group(1)
Andreas Klöckner
committed
ordinal = int(match.group(2)) # type: Optional[int]
Andreas Klöckner
committed
else:
# event without numeral
Andreas Klöckner
committed
event_kind = datespec_str
Andreas Klöckner
committed
if vctx is not None:
from course.validation import validate_identifier
validate_identifier(vctx, "%s: event kind" % location, event_kind)
Andreas Klöckner
committed
if course is None:
return now()
from course.models import Event
event_obj = Event.objects.get(
course=course,
kind=event_kind,
ordinal=ordinal)
except ObjectDoesNotExist:
Andreas Klöckner
committed
if vctx is not None:
vctx.add_warning(
location,
_("unrecognized date/time specification: '%s' "
"(interpreted as 'now')")
% orig_datespec)
return now()
if is_end:
if event_obj.end_time is not None:
result = event_obj.end_time
else:
result = event_obj.time
if vctx is not None:
vctx.add_warning(
location,
_("event '%s' has no end time, using start time instead")
% orig_datespec)
else:
result = event_obj.time
return apply_postprocs(result)
# }}}
# {{{ page chunks
Andreas Klöckner
committed
def compute_chunk_weight_and_shown(
course, # type: Course
chunk, # type: ChunkDesc
roles, # type: List[Text]
Andreas Klöckner
committed
now_datetime, # type: datetime.datetime
facilities, # type: FrozenSet[Text]
Andreas Klöckner
committed
):
# type: (...) -> Tuple[float, bool]
if not hasattr(chunk, "rules"):
return 0, True
if hasattr(rule, "if_has_role"):
Andreas Klöckner
committed
if all(role not in rule.if_has_role for role in roles):
if hasattr(rule, "if_after"):
start_date = parse_date_spec(course, rule.if_after)
if now_datetime < start_date:
continue
if hasattr(rule, "if_before"):
end_date = parse_date_spec(course, rule.if_before)
if end_date < now_datetime:
continue
if hasattr(rule, "if_in_facility"):
if rule.if_in_facility not in facilities:
# {{{ deprecated
Andreas Klöckner
committed
if all(role not in rule.roles for role in roles):
start_date = parse_date_spec(course, rule.start)
end_date = parse_date_spec(course, rule.end)
shown = True
if hasattr(rule, "shown"):
shown = rule.shown
return rule.weight, shown
Andreas Klöckner
committed
def get_processed_page_chunks(
course, # type: Course
repo, # type: Repo_ish
commit_sha, # type: bytes
page_desc, # type: StaticPageDesc
roles, # type: List[Text]
now_datetime, # type: datetime.datetime
facilities, # type: FrozenSet[Text]
Andreas Klöckner
committed
):
# type: (...) -> List[ChunkDesc]
for chunk in page_desc.chunks:
chunk.weight, chunk.shown = \
compute_chunk_weight_and_shown(
Andreas Klöckner
committed
course, chunk, roles, now_datetime,
facilities)
chunk.html_content = markup_to_html(course, repo, commit_sha, chunk.content)
if not hasattr(chunk, "title"):
chunk.title = extract_title_from_markup(chunk.content)
page_desc.chunks.sort(key=lambda chunk: chunk.weight, reverse=True)
return [chunk for chunk in page_desc.chunks
# }}}
# {{{ repo desc getting
def normalize_page_desc(page_desc):
Andreas Klöckner
committed
# type: (StaticPageDesc) -> StaticPageDesc
if hasattr(page_desc, "content"):
content = page_desc.content
from relate.utils import struct_to_dict, Struct
d = struct_to_dict(page_desc)
del d["content"]
d["chunks"] = [Struct({"id": "main", "content": content})]
Andreas Klöckner
committed
return cast(StaticPageDesc, Struct(d))
return page_desc
def get_staticpage_desc(repo, course, commit_sha, filename):
Andreas Klöckner
committed
# type: (Repo_ish, Course, bytes, Text) -> StaticPageDesc
page_desc = get_yaml_from_repo(repo, filename, commit_sha)
page_desc = normalize_page_desc(page_desc)
return page_desc
def get_course_desc(repo, course, commit_sha):
Andreas Klöckner
committed
# type: (Repo_ish, Course, bytes) -> CourseDesc
return cast(
CourseDesc,
get_staticpage_desc(repo, course, commit_sha, course.course_file))
def normalize_flow_desc(flow_desc):
Andreas Klöckner
committed
# type: (FlowDesc) -> FlowDesc
if hasattr(flow_desc, "pages"):
pages = flow_desc.pages
from relate.utils import struct_to_dict, Struct
d = struct_to_dict(flow_desc)
del d["pages"]
d["groups"] = [Struct({"id": "main", "pages": pages})]
Andreas Klöckner
committed
return cast(FlowDesc, Struct(d))
Andreas Klöckner
committed
if hasattr(flow_desc, "rules"):
rules = flow_desc.rules
if not hasattr(rules, "grade_identifier"):
# Legacy content with grade_identifier in grading rule,
# move first found grade_identifier up to rules.
Andreas Klöckner
committed
rules.grade_identifier = None
rules.grade_aggregation_strategy = None
Andreas Klöckner
committed
for grule in rules.grading:
if grule.grade_identifier is not None:
rules.grade_identifier = grule.grade_identifier
rules.grade_aggregation_strategy = \
grule.grade_aggregation_strategy
break
def get_flow_desc(repo, course, flow_id, commit_sha):
Andreas Klöckner
committed
# type: (Repo_ish, Course, Text, bytes) -> FlowDesc
flow_desc = get_yaml_from_repo(repo, "flows/%s.yml" % flow_id, commit_sha)
flow_desc = normalize_flow_desc(flow_desc)
flow_desc.description_html = markup_to_html(
course, repo, commit_sha, getattr(flow_desc, "description", None))
return flow_desc
def get_flow_page_desc(flow_id, flow_desc, group_id, page_id):
Andreas Klöckner
committed
# type: (Text, FlowDesc, Text, Text) -> FlowPageDesc
if grp.id == group_id:
for page in grp.pages:
if page.id == page_id:
return page
raise ObjectDoesNotExist(
_("page '%(group_id)s/%(page_id)s' in flow '%(flow_id)s'") % {
'group_id': group_id,
'page_id': page_id,
'flow_id': flow_id
})
# }}}
# {{{ flow page handling
class ClassNotFoundError(RuntimeError):
Andreas Klöckner
committed
# type: (Text) -> type
if len(components) < 2:
# need at least one module plus class name
raise ClassNotFoundError(name)
module_name = ".".join(components[:-1])
try:
mod = __import__(module_name)
except ImportError:
raise ClassNotFoundError(name)
for comp in components[1:]:
try:
mod = getattr(mod, comp)
except AttributeError:
raise ClassNotFoundError(name)
def get_flow_page_class(repo, typename, commit_sha):
Andreas Klöckner
committed
# type: (Repo_ish, Text, bytes) -> type
# look among default page types
import course.page
try:
return getattr(course.page, typename)
except AttributeError:
pass
# try a global dotted-name import
try:
return import_class(typename)
except ClassNotFoundError:
pass
if typename.startswith("repo:"):
stripped_typename = typename[5:]
components = stripped_typename.split(".")
raise ClassNotFoundError(
_("repo page class must conist of two "
"dotted components (invalid: '%s')")
% typename)
module_name = "code/"+module+".py"
module_code = get_repo_blob(repo, module_name, commit_sha,
allow_tree=False).data
Andreas Klöckner
committed
module_dict = {} # type: Dict
exec(compile(module_code, module_name, 'exec'), module_dict)
try:
return module_dict[classname]
except AttributeError:
raise ClassNotFoundError(typename)
else:
raise ClassNotFoundError(typename)
def instantiate_flow_page(location, repo, page_desc, commit_sha):
Andreas Klöckner
committed
# type: (Text, Repo_ish, FlowPageDesc, bytes) -> PageBase
class_ = get_flow_page_class(repo, page_desc.type, commit_sha)
return class_(None, location, page_desc)
class CourseCommitShaDoesNotExist(Exception):
pass
Andreas Klöckner
committed
def get_course_commit_sha(course, participation, raise_on_error=False):
# type: (Course, Optional[Participation], Optional[bool]) -> bytes
sha = course.active_git_commit_sha
Andreas Klöckner
committed
if participation is not None:
if participation.preview_git_commit_sha:
preview_sha = participation.preview_git_commit_sha
with get_course_repo(course) as repo:
if isinstance(repo, SubdirRepoWrapper):
repo = repo.repo
try:
repo[preview_sha.encode()]
except KeyError:
if raise_on_error:
raise CourseCommitShaDoesNotExist(
_("Preview revision '%s' does not exist--"
"showing active course content instead."
% participation.preview_git_commit_sha))
finally:
repo.close()
Andreas Klöckner
committed
if preview_sha is not None:
sha = preview_sha
Andreas Klöckner
committed
# type: (Repo_ish, bytes) -> List[Text]
flow_ids = []
try:
flows_tree = get_repo_blob(repo, "flows", commit_sha)
except ObjectDoesNotExist:
# That's OK--no flows yet.
pass
else:
for entry in flows_tree.items():
flow_ids.append(entry.path[:-4].decode("utf-8"))