Newer
Older
Andreas Klöckner
committed
class PlusDeltaPostprocessor(DatespecPostprocessor):
def __init__(self, count, period):
Andreas Klöckner
committed
# type: (int, Text) -> None
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
self.count = count
self.period = period
@classmethod
def parse(cls, s):
match = PLUS_DELTA_RE.match(s)
if match is not None:
count = int(match.group(3))
if match.group(2) == "-":
count = -count
period = match.group(4)
return match.group(1), PlusDeltaPostprocessor(count, period)
else:
return s, None
def apply(self, dtm):
if self.period.startswith("week"):
d = datetime.timedelta(weeks=self.count)
elif self.period.startswith("day"):
d = datetime.timedelta(days=self.count)
elif self.period.startswith("hour"):
d = datetime.timedelta(hours=self.count)
elif self.period.startswith("minute"):
d = datetime.timedelta(minutes=self.count)
else:
raise InvalidDatespec(_("invalid period: %s" % self.period))
return dtm + d
DATESPEC_POSTPROCESSORS = [
AtTimePostprocessor,
PlusDeltaPostprocessor,
Andreas Klöckner
committed
] # type: List[Any]
Andreas Klöckner
committed
def parse_date_spec(
Andreas Klöckner
committed
datespec, # type: Union[Text, datetime.date, datetime.datetime]
vctx=None, # type: Optional[ValidationContext]
location=None, # type: Optional[Text]
):
# type: (...) -> datetime.datetime
Andreas Klöckner
committed
orig_datespec = datespec
def localize_if_needed(d):
Andreas Klöckner
committed
# type: (datetime.datetime) -> datetime.datetime
if d.tzinfo is None:
from relate.utils import localize_datetime
return localize_datetime(d)
return d
if isinstance(datespec, datetime.datetime):
return localize_if_needed(datespec)
if isinstance(datespec, datetime.date):
return localize_if_needed(
datetime.datetime.combine(datespec, datetime.time.min))
Andreas Klöckner
committed
datespec_str = cast(Text, datespec).strip()
# {{{ parse postprocessors
Andreas Klöckner
committed
postprocs = [] # type: List[DatespecPostprocessor]
while True:
parsed_one = False
for pp_class in DATESPEC_POSTPROCESSORS:
Andreas Klöckner
committed
datespec_str, postproc = pp_class.parse(datespec_str)
if postproc is not None:
parsed_one = True
Andreas Klöckner
committed
postprocs.insert(0, cast(DatespecPostprocessor, postproc))
break
Andreas Klöckner
committed
datespec_str = datespec_str.strip()
if not parsed_one:
break
# }}}
def apply_postprocs(dtime):
Andreas Klöckner
committed
# type: (datetime.datetime) -> datetime.datetime
for postproc in postprocs:
dtime = postproc.apply(dtime)
return dtime
Andreas Klöckner
committed
match = DATE_RE.match(datespec_str)
Andreas Klöckner
committed
res_date = datetime.date(
int(match.group(1)),
int(match.group(2)),
int(match.group(3)))
result = localize_if_needed(
Andreas Klöckner
committed
datetime.datetime.combine(res_date, datetime.time.min))
Andreas Klöckner
committed
is_end = datespec_str.startswith(END_PREFIX)
Andreas Klöckner
committed
datespec_str = datespec_str[len(END_PREFIX):]
Andreas Klöckner
committed
match = TRAILING_NUMERAL_RE.match(datespec_str)
# event with numeral
Andreas Klöckner
committed
event_kind = match.group(1)
Andreas Klöckner
committed
ordinal = int(match.group(2)) # type: Optional[int]
Andreas Klöckner
committed
else:
# event without numeral
Andreas Klöckner
committed
event_kind = datespec_str
Andreas Klöckner
committed
if vctx is not None:
from course.validation import validate_identifier
validate_identifier(vctx, "%s: event kind" % location, event_kind)
Andreas Klöckner
committed
if course is None:
return now()
from course.models import Event
event_obj = Event.objects.get(
course=course,
kind=event_kind,
ordinal=ordinal)
except ObjectDoesNotExist:
Andreas Klöckner
committed
if vctx is not None:
vctx.add_warning(
location,
_("unrecognized date/time specification: '%s' "
"(interpreted as 'now')")
% orig_datespec)
return now()
if is_end:
if event_obj.end_time is not None:
result = event_obj.end_time
else:
result = event_obj.time
if vctx is not None:
vctx.add_warning(
location,
_("event '%s' has no end time, using start time instead")
% orig_datespec)
else:
result = event_obj.time
return apply_postprocs(result)
# }}}
# {{{ page chunks
Andreas Klöckner
committed
def compute_chunk_weight_and_shown(
course, # type: Course
chunk, # type: ChunkDesc
roles, # type: List[Text]
Andreas Klöckner
committed
now_datetime, # type: datetime.datetime
facilities, # type: frozenset[Text]
):
# type: (...) -> Tuple[float, bool]
if not hasattr(chunk, "rules"):
return 0, True
if hasattr(rule, "if_has_role"):
Andreas Klöckner
committed
if all(role not in rule.if_has_role for role in roles):
if hasattr(rule, "if_after"):
start_date = parse_date_spec(course, rule.if_after)
if now_datetime < start_date:
continue
if hasattr(rule, "if_before"):
end_date = parse_date_spec(course, rule.if_before)
if end_date < now_datetime:
continue
if hasattr(rule, "if_in_facility"):
if rule.if_in_facility not in facilities:
# {{{ deprecated
Andreas Klöckner
committed
if all(role not in rule.roles for role in roles):
start_date = parse_date_spec(course, rule.start)
end_date = parse_date_spec(course, rule.end)
shown = True
if hasattr(rule, "shown"):
shown = rule.shown
return rule.weight, shown
Andreas Klöckner
committed
def get_processed_page_chunks(
course, # type: Course
repo, # type: Repo_ish
commit_sha, # type: bytes
page_desc, # type: StaticPageDesc
roles, # type: List[Text]
now_datetime, # type: datetime.datetime
facilities, # type: frozenset[Text]
):
# type: (...) -> List[ChunkDesc]
for chunk in page_desc.chunks:
chunk.weight, chunk.shown = \
compute_chunk_weight_and_shown(
Andreas Klöckner
committed
course, chunk, roles, now_datetime,
facilities)
chunk.html_content = markup_to_html(course, repo, commit_sha, chunk.content)
if not hasattr(chunk, "title"):
chunk.title = extract_title_from_markup(chunk.content)
page_desc.chunks.sort(key=lambda chunk: chunk.weight, reverse=True)
return [chunk for chunk in page_desc.chunks
# }}}
# {{{ repo desc getting
def normalize_page_desc(page_desc):
Andreas Klöckner
committed
# type: (StaticPageDesc) -> StaticPageDesc
if hasattr(page_desc, "content"):
content = page_desc.content
from relate.utils import struct_to_dict, Struct
d = struct_to_dict(page_desc)
del d["content"]
d["chunks"] = [Struct({"id": "main", "content": content})]
Andreas Klöckner
committed
return cast(StaticPageDesc, Struct(d))
return page_desc
def get_staticpage_desc(repo, course, commit_sha, filename):
Andreas Klöckner
committed
# type: (Repo_ish, Course, bytes, Text) -> StaticPageDesc
page_desc = get_yaml_from_repo(repo, filename, commit_sha)
page_desc = normalize_page_desc(page_desc)
return page_desc
def get_course_desc(repo, course, commit_sha):
Andreas Klöckner
committed
# type: (Repo_ish, Course, bytes) -> CourseDesc
return cast(
CourseDesc,
get_staticpage_desc(repo, course, commit_sha, course.course_file))
def normalize_flow_desc(flow_desc):
Andreas Klöckner
committed
# type: (FlowDesc) -> FlowDesc
if hasattr(flow_desc, "pages"):
pages = flow_desc.pages
from relate.utils import struct_to_dict, Struct
d = struct_to_dict(flow_desc)
del d["pages"]
d["groups"] = [Struct({"id": "main", "pages": pages})]
Andreas Klöckner
committed
return cast(FlowDesc, Struct(d))
Andreas Klöckner
committed
if hasattr(flow_desc, "rules"):
rules = flow_desc.rules
if not hasattr(rules, "grade_identifier"):
# Legacy content with grade_identifier in grading rule,
# move first found grade_identifier up to rules.
Andreas Klöckner
committed
rules.grade_identifier = None
rules.grade_aggregation_strategy = None
Andreas Klöckner
committed
for grule in rules.grading:
if grule.grade_identifier is not None:
rules.grade_identifier = grule.grade_identifier
rules.grade_aggregation_strategy = \
grule.grade_aggregation_strategy
break
def get_flow_desc(repo, course, flow_id, commit_sha):
Andreas Klöckner
committed
# type: (Repo_ish, Course, Text, bytes) -> FlowDesc
flow_desc = get_yaml_from_repo(repo, "flows/%s.yml" % flow_id, commit_sha)
flow_desc = normalize_flow_desc(flow_desc)
flow_desc.description_html = markup_to_html(
course, repo, commit_sha, getattr(flow_desc, "description", None))
return flow_desc
def get_flow_page_desc(flow_id, flow_desc, group_id, page_id):
Andreas Klöckner
committed
# type: (Text, FlowDesc, Text, Text) -> FlowPageDesc
if grp.id == group_id:
for page in grp.pages:
if page.id == page_id:
return page
raise ObjectDoesNotExist(
_("page '%(group_id)s/%(page_id)s' in flow '%(flow_id)s'") % {
'group_id': group_id,
'page_id': page_id,
'flow_id': flow_id
})
# }}}
# {{{ flow page handling
class ClassNotFoundError(RuntimeError):
Andreas Klöckner
committed
# type: (Text) -> type
if len(components) < 2:
# need at least one module plus class name
raise ClassNotFoundError(name)
module_name = ".".join(components[:-1])
try:
mod = __import__(module_name)
except ImportError:
raise ClassNotFoundError(name)
for comp in components[1:]:
try:
mod = getattr(mod, comp)
except AttributeError:
raise ClassNotFoundError(name)
def get_flow_page_class(repo, typename, commit_sha):
Andreas Klöckner
committed
# type: (Repo_ish, Text, bytes) -> type
# look among default page types
import course.page
try:
return getattr(course.page, typename)
except AttributeError:
pass
# try a global dotted-name import
try:
return import_class(typename)
except ClassNotFoundError:
pass
if typename.startswith("repo:"):
stripped_typename = typename[5:]
components = stripped_typename.split(".")
raise ClassNotFoundError(
_("repo page class must conist of two "
"dotted components (invalid: '%s')")
% typename)
module_name = "code/"+module+".py"
module_code = get_repo_blob(repo, module_name, commit_sha,
allow_tree=False).data
Andreas Klöckner
committed
module_dict = {} # type: Dict
exec(compile(module_code, module_name, 'exec'), module_dict)
try:
return module_dict[classname]
except AttributeError:
raise ClassNotFoundError(typename)
else:
raise ClassNotFoundError(typename)
def instantiate_flow_page(location, repo, page_desc, commit_sha):
Andreas Klöckner
committed
# type: (Text, Repo_ish, FlowPageDesc, bytes) -> PageBase
class_ = get_flow_page_class(repo, page_desc.type, commit_sha)
return class_(None, location, page_desc)
def get_course_commit_sha(course, participation):
Andreas Klöckner
committed
# type: (Course, Optional[Participation]) -> bytes
# logic duplicated in course.utils.CoursePageContext
sha = course.active_git_commit_sha
Andreas Klöckner
committed
if participation is not None:
if participation.preview_git_commit_sha:
preview_sha = participation.preview_git_commit_sha
Andreas Klöckner
committed
repo = get_course_repo(course)
if isinstance(repo, SubdirRepoWrapper):
repo = repo.repo
Andreas Klöckner
committed
try:
repo[preview_sha.encode()]
except KeyError:
preview_sha = None
Andreas Klöckner
committed
if preview_sha is not None:
sha = preview_sha
Andreas Klöckner
committed
# type: (Repo_ish, bytes) -> List[Text]
flow_ids = []
try:
flows_tree = get_repo_blob(repo, "flows", commit_sha)
except ObjectDoesNotExist:
# That's OK--no flows yet.
pass
else:
for entry in flows_tree.items():
flow_ids.append(entry.path[:-4])
return sorted(flow_ids)