Skip to content
content.py 43.9 KiB
Newer Older

    def apply(self, dtm):
        # type: (datetime.datetime) -> datetime.datetime
        raise NotImplementedError()


AT_TIME_RE = re.compile(r"^(.*)\s*@\s*([0-2]?[0-9])\:([0-9][0-9])\s*$")


class AtTimePostprocessor(DatespecPostprocessor):
    def __init__(self, hour, minute, second=0):
        self.hour = hour
        self.minute = minute
        self.second = second

    @classmethod
    def parse(cls, s):
        match = AT_TIME_RE.match(s)
        if match is not None:
            hour = int(match.group(2))
            minute = int(match.group(3))

            if not (0 <= hour < 24):
                raise InvalidDatespec(s)

            if not (0 <= minute < 60):
                raise InvalidDatespec(s)

            return match.group(1), AtTimePostprocessor(hour, minute)
        else:
            return s, None

    def apply(self, dtm):
        from pytz import timezone
        server_tz = timezone(settings.TIME_ZONE)

        return dtm.astimezone(server_tz).replace(
                    hour=self.hour,
                    minute=self.minute,
                    second=self.second)


PLUS_DELTA_RE = re.compile(r"^(.*)\s*([+-])\s*([0-9]+)\s+"
    "(weeks?|days?|hours?|minutes?)$")


class PlusDeltaPostprocessor(DatespecPostprocessor):
        self.count = count
        self.period = period

    @classmethod
    def parse(cls, s):
        match = PLUS_DELTA_RE.match(s)
        if match is not None:
            count = int(match.group(3))
            if match.group(2) == "-":
                count = -count
            period = match.group(4)

            return match.group(1), PlusDeltaPostprocessor(count, period)
        else:
            return s, None

    def apply(self, dtm):
        if self.period.startswith("week"):
            d = datetime.timedelta(weeks=self.count)
        elif self.period.startswith("day"):
            d = datetime.timedelta(days=self.count)
        elif self.period.startswith("hour"):
            d = datetime.timedelta(hours=self.count)
        elif self.period.startswith("minute"):
            d = datetime.timedelta(minutes=self.count)
        else:
ifaint's avatar
ifaint committed
            raise InvalidDatespec(_("invalid period: %s" % self.period))

        return dtm + d


DATESPEC_POSTPROCESSORS = [
        AtTimePostprocessor,
        PlusDeltaPostprocessor,
Andreas Klöckner's avatar
Andreas Klöckner committed
        course,  # type: Optional[Course]
        datespec,  # type: Union[Text, datetime.date, datetime.datetime]
        vctx=None,  # type: Optional[ValidationContext]
        location=None,  # type: Optional[Text]
        ):
    # type: (...)  -> datetime.datetime
Andreas Klöckner's avatar
Andreas Klöckner committed
    if datespec is None:
        return None

    def localize_if_needed(d):
        # type: (datetime.datetime) -> datetime.datetime
            from relate.utils import localize_datetime
            return localize_datetime(d)
            return d

    if isinstance(datespec, datetime.datetime):
        return localize_if_needed(datespec)
    if isinstance(datespec, datetime.date):
        return localize_if_needed(
                datetime.datetime.combine(datespec, datetime.time.min))
Dong Zhuang's avatar
Dong Zhuang committed
    try:
        from typing import Text
    except ImportError:
        Text = None  # noqa
    datespec_str = cast(Text, datespec).strip()  # type: ignore
    postprocs = []  # type: List[DatespecPostprocessor]
    while True:
        parsed_one = False
        for pp_class in DATESPEC_POSTPROCESSORS:
            datespec_str, postproc = pp_class.parse(datespec_str)
            if postproc is not None:
                parsed_one = True
                postprocs.insert(0, cast(DatespecPostprocessor, postproc))
        # type: (datetime.datetime) -> datetime.datetime
        for postproc in postprocs:
            dtime = postproc.apply(dtime)

        return dtime

Andreas Klöckner's avatar
Andreas Klöckner committed
    if match:
                int(match.group(1)),
                int(match.group(2)),
                int(match.group(3)))
        result = localize_if_needed(
                datetime.datetime.combine(res_date, datetime.time.min))
        return apply_postprocs(result)
    is_end = datespec_str.startswith(END_PREFIX)
        datespec_str = datespec_str[len(END_PREFIX):]
    match = TRAILING_NUMERAL_RE.match(datespec_str)
Andreas Klöckner's avatar
Andreas Klöckner committed
    if match:
        event_kind = match.group(1)
        ordinal = int(match.group(2))  # type: Optional[int]
    else:
        # event without numeral
Andreas Klöckner's avatar
Andreas Klöckner committed
        ordinal = None

    if vctx is not None:
        from course.validation import validate_identifier
        validate_identifier(vctx, "%s: event kind" % location, event_kind)

    if course is None:
        return now()

    from course.models import Event
        event_obj = Event.objects.get(
            course=course,
            kind=event_kind,
            ordinal=ordinal)
    except ObjectDoesNotExist:
        if vctx is not None:
            vctx.add_warning(
                    location,
                    _("unrecognized date/time specification: '%s' "
                    "(interpreted as 'now')")
                    % orig_datespec)
        return now()
    if is_end:
        if event_obj.end_time is not None:
            result = event_obj.end_time
        else:
            result = event_obj.time
            if vctx is not None:
                vctx.add_warning(
                        location,
                        _("event '%s' has no end time, using start time instead")
                        % orig_datespec)

    else:
        result = event_obj.time

    return apply_postprocs(result)


def compute_chunk_weight_and_shown(
        course,  # type:  Course
        chunk,  # type: ChunkDesc
        now_datetime,  # type: datetime.datetime
        facilities,  # type: FrozenSet[Text]
    if not hasattr(chunk, "rules"):
        return 0, True

    for rule in chunk.rules:
        if hasattr(rule, "if_has_role"):
            if all(role not in rule.if_has_role for role in roles):
                continue
        if hasattr(rule, "if_after"):
            start_date = parse_date_spec(course, rule.if_after)
            if now_datetime < start_date:
                continue

        if hasattr(rule, "if_before"):
            end_date = parse_date_spec(course, rule.if_before)
            if end_date < now_datetime:
                continue

        if hasattr(rule, "if_in_facility"):
            if rule.if_in_facility not in facilities:
        if hasattr(rule, "roles"):
            if all(role not in rule.roles for role in roles):
        if hasattr(rule, "start"):
            start_date = parse_date_spec(course, rule.start)
            if now_datetime < start_date:
Andreas Klöckner's avatar
Andreas Klöckner committed
                continue
        if hasattr(rule, "end"):
            end_date = parse_date_spec(course, rule.end)
            if end_date < now_datetime:
Andreas Klöckner's avatar
Andreas Klöckner committed
                continue
        shown = True
        if hasattr(rule, "shown"):
            shown = rule.shown

        return rule.weight, shown
def get_processed_page_chunks(
        course,  # type: Course
        repo,  # type: Repo_ish
        commit_sha,  # type: bytes
        page_desc,  # type: StaticPageDesc
        roles,  # type: List[Text]
        now_datetime,  # type: datetime.datetime
        facilities,  # type: FrozenSet[Text]
    for chunk in page_desc.chunks:
        chunk.weight, chunk.shown = \
                compute_chunk_weight_and_shown(
        chunk.html_content = markup_to_html(course, repo, commit_sha, chunk.content)
        if not hasattr(chunk, "title"):
            chunk.title = extract_title_from_markup(chunk.content)
    page_desc.chunks.sort(key=lambda chunk: chunk.weight, reverse=True)
    return [chunk for chunk in page_desc.chunks
            if chunk.shown]
def normalize_page_desc(page_desc):
    # type: (StaticPageDesc) -> StaticPageDesc
    if hasattr(page_desc, "content"):
        content = page_desc.content
        from relate.utils import struct_to_dict, Struct
        d = struct_to_dict(page_desc)
        del d["content"]
        d["chunks"] = [Struct({"id": "main", "content": content})]

    return page_desc


def get_staticpage_desc(repo, course, commit_sha, filename):
    # type: (Repo_ish, Course, bytes, Text) -> StaticPageDesc

    page_desc = get_yaml_from_repo(repo, filename, commit_sha)
    page_desc = normalize_page_desc(page_desc)
    return page_desc


def get_course_desc(repo, course, commit_sha):
    # type: (Repo_ish, Course, bytes) -> CourseDesc

    return cast(
            CourseDesc,
            get_staticpage_desc(repo, course, commit_sha, course.course_file))
def normalize_flow_desc(flow_desc):
    if hasattr(flow_desc, "pages"):
        pages = flow_desc.pages
        from relate.utils import struct_to_dict, Struct
        d = struct_to_dict(flow_desc)
        del d["pages"]
        d["groups"] = [Struct({"id": "main", "pages": pages})]
    if hasattr(flow_desc, "rules"):
        rules = flow_desc.rules
        if not hasattr(rules, "grade_identifier"):
            # Legacy content with grade_identifier in grading rule,
            # move first found grade_identifier up to rules.

            rules.grade_identifier = None
            rules.grade_aggregation_strategy = None

            for grule in rules.grading:
                if grule.grade_identifier is not None:
                    rules.grade_identifier = grule.grade_identifier
                    rules.grade_aggregation_strategy = \
                            grule.grade_aggregation_strategy
                    break

def get_flow_desc(repo, course, flow_id, commit_sha):
    # type: (Repo_ish, Course, Text, bytes) -> FlowDesc

    flow_desc = get_yaml_from_repo(repo, "flows/%s.yml" % flow_id, commit_sha)

    flow_desc = normalize_flow_desc(flow_desc)
    flow_desc.description_html = markup_to_html(
            course, repo, commit_sha, getattr(flow_desc, "description", None))
    return flow_desc
def get_flow_page_desc(flow_id, flow_desc, group_id, page_id):
    # type: (Text, FlowDesc, Text, Text) -> FlowPageDesc

    for grp in flow_desc.groups:
        if grp.id == group_id:
            for page in grp.pages:
                if page.id == page_id:
                    return page

    raise ObjectDoesNotExist(
            _("page '%(group_id)s/%(page_id)s' in flow '%(flow_id)s'") % {
                'group_id': group_id,
                'page_id': page_id,
                'flow_id': flow_id
                })
class ClassNotFoundError(RuntimeError):
def import_class(name):
    components = name.split('.')
    if len(components) < 2:
        # need at least one module plus class name
        raise ClassNotFoundError(name)
    module_name = ".".join(components[:-1])
    try:
        mod = __import__(module_name)
    except ImportError:
        raise ClassNotFoundError(name)
    for comp in components[1:]:
        try:
            mod = getattr(mod, comp)
        except AttributeError:
            raise ClassNotFoundError(name)
def get_flow_page_class(repo, typename, commit_sha):
    # look among default page types
    import course.page
    try:
        return getattr(course.page, typename)
    except AttributeError:
        pass
    # try a global dotted-name import
    try:
        return import_class(typename)
    except ClassNotFoundError:
        pass
    if typename.startswith("repo:"):
        stripped_typename = typename[5:]
        components = stripped_typename.split(".")
        if len(components) != 2:
            raise ClassNotFoundError(
                    _("repo page class must conist of two "
                    "dotted components (invalid: '%s')")
        module, classname = components
Andreas Klöckner's avatar
Andreas Klöckner committed
        module_code = get_repo_blob(repo, module_name, commit_sha,
                allow_tree=False).data
        exec(compile(module_code, module_name, 'exec'), module_dict)
        try:
            return module_dict[classname]
        except AttributeError:
            raise ClassNotFoundError(typename)
    else:
        raise ClassNotFoundError(typename)
def instantiate_flow_page(location, repo, page_desc, commit_sha):
    # type: (Text, Repo_ish, FlowPageDesc, bytes) -> PageBase
    class_ = get_flow_page_class(repo, page_desc.type, commit_sha)
    return class_(None, location, page_desc)
def get_course_commit_sha(course, participation):
    # type: (Course, Optional[Participation]) -> bytes

    # logic duplicated in course.utils.CoursePageContext

    sha = course.active_git_commit_sha

    if participation is not None:
        if participation.preview_git_commit_sha:
            preview_sha = participation.preview_git_commit_sha
Dong Zhuang's avatar
Dong Zhuang committed
            with get_course_repo(course) as repo:
                if isinstance(repo, SubdirRepoWrapper):
                    repo = repo.repo

                try:
                    repo[preview_sha.encode()]
                except KeyError:
                    preview_sha = None
            if preview_sha is not None:
                sha = preview_sha
def list_flow_ids(repo, commit_sha):
    # type: (Repo_ish, bytes) -> List[Text]
    flow_ids = []
    try:
        flows_tree = get_repo_blob(repo, "flows", commit_sha)
    except ObjectDoesNotExist:
        # That's OK--no flows yet.
        pass
    else:
        for entry in flows_tree.items():
            if entry.path.endswith(b".yml"):
                flow_ids.append(entry.path[:-4])

    return sorted(flow_ids)

# vim: foldmethod=marker