Newer
Older
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
super(TwoCourseTestMixin, self).setUp()
# reload objects created during setUpTestData in case they were modified in
# tests. Ref: https://goo.gl/AuzJRC#django.test.TestCase.setUpTestData
self.course1.refresh_from_db()
self.course1_instructor_participation.refresh_from_db()
self.course1_student_participation.refresh_from_db()
self.course1_ta_participation.refresh_from_db()
self.course2.refresh_from_db()
self.course2_instructor_participation.refresh_from_db()
self.course2_student_participation.refresh_from_db()
self.course2_ta_participation.refresh_from_db()
class SingleCoursePageTestMixin(SingleCourseTestMixin):
# This serves as cache
_default_session_id = None
@property
def flow_id(self):
raise NotImplementedError
@classmethod
def update_default_flow_session_id(cls, course_identifier):
cls._default_session_id = cls.default_flow_params["flow_session_id"]
def get_default_flow_session_id(self, course_identifier):
if self._default_session_id is not None:
return self._default_session_id
self._default_session_id = self.get_latest_session_id(course_identifier)
return self._default_session_id
class TwoCoursePageTestMixin(TwoCourseTestMixin):
_course1_default_session_id = None
_course2_default_session_id = None
@property
def flow_id(self):
raise NotImplementedError
def get_default_flow_session_id(self, course_identifier):
if course_identifier == self.course1.identifier:
if self._course1_default_session_id is not None:
return self._course1_default_session_id
self._course1_default_session_id = (
self.get_last_session_id(course_identifier))
return self._course1_default_session_id
if course_identifier == self.course2.identifier:
if self._course2_default_session_id is not None:
return self._course2_default_session_id
self._course2_default_session_id = (
self.get_last_session_id(course_identifier))
return self._course2_default_session_id
@classmethod
def update_default_flow_session_id(cls, course_identifier):
new_session_id = cls.default_flow_params["flow_session_id"]
if course_identifier == cls.course1.identifier:
cls._course1_default_session_id = new_session_id
elif course_identifier == cls.course2.identifier:
cls._course2_default_session_id = new_session_id
class FallBackStorageMessageTestMixin(object):
# In case other message storage are used, the following is the default
# storage used by django and RELATE. Tests which concerns the message
# should not include this mixin.
storage = 'django.contrib.messages.storage.fallback.FallbackStorage'
def setUp(self): # noqa
self.settings_override = override_settings(MESSAGE_STORAGE=self.storage)
self.settings_override.enable()
def tearDown(self): # noqa
self.settings_override.disable()
def get_listed_storage_from_response(self, response):
return list(self.get_response_context_value_by_name(response, 'messages'))
def clear_message_response_storage(self, response):
# this should only be used for debug, because we are using private method
# which might change
try:
storage = self.get_response_context_value_by_name(response, 'messages')
except AssertionError:
# message doesn't exist in response context
return
if hasattr(storage, '_loaded_data'):
storage._loaded_data = []
elif hasattr(storage, '_loaded_message'):
storage._loaded_messages = []
if hasattr(storage, '_queued_messages'):
storage._queued_messages = []
self.assertEqual(len(storage), 0)
def assertResponseMessagesCount(self, response, expected_count): # noqa
storage = self.get_listed_storage_from_response(response)
self.assertEqual(len(storage), expected_count)
def assertResponseMessagesEqual(self, response, expected_messages): # noqa
storage = self.get_listed_storage_from_response(response)
if not isinstance(expected_messages, list):
expected_messages = [expected_messages]
self.assertEqual(len([m for m in storage]), len(expected_messages))
self.assertEqual([m.message for m in storage], expected_messages)
def assertResponseMessagesEqualRegex(self, response, expected_message_regexs): # noqa
storage = self.get_listed_storage_from_response(response)
if not isinstance(expected_message_regexs, list):
expected_message_regexs = [expected_message_regexs]
self.assertEqual(len([m for m in storage]), len(expected_message_regexs))
messages = [m.message for m in storage]
for idx, m in enumerate(messages):
six.assertRegex(self, m, expected_message_regexs[idx])
def assertResponseMessagesContains(self, response, expected_messages): # noqa
storage = self.get_listed_storage_from_response(response)
if isinstance(expected_messages, str):
expected_messages = [expected_messages]
messages = [m.message for m in storage]
for em in expected_messages:
self.assertIn(em, messages)
def assertResponseMessageLevelsEqual(self, response, expected_levels): # noqa
storage = self.get_listed_storage_from_response(response)
self.assertEqual([m.level for m in storage], expected_levels)
def debug_print_response_messages(self, response):
"""
For debugging :class:`django.contrib.messages` objects in post response
:param response: response
"""
try:
storage = self.get_listed_storage_from_response(response)
print("\n-----------message start (%i total)-------------"
% len(storage))
for m in storage:
print(m.message)
print("-----------message end-------------\n")
except KeyError:
print("\n-------no message----------")
Andreas Klöckner
committed
class SubprocessRunpyContainerMixin(object):
"""
This mixin is used to fake a runpy container, only needed when
the TestCase include test(s) for code questions
"""
@classmethod
def setUpClass(cls): # noqa
if six.PY2:
from unittest import SkipTest
raise SkipTest("In process fake container is configured for "
"PY3 only, since currently runpy docker only "
"provide PY3 envrionment")
Andreas Klöckner
committed
super(SubprocessRunpyContainerMixin, cls).setUpClass()
"course.page.code.SPAWN_CONTAINERS_FOR_RUNPY", False)
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
cls.faked_container_patch.start()
python_executable = os.getenv("PY_EXE")
if not python_executable:
import sys
python_executable = sys.executable
import subprocess
args = [python_executable,
os.path.abspath(
os.path.join(
os.path.dirname(__file__), os.pardir,
"docker-image-run-py", "runpy")),
]
cls.faked_container_process = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
# because runpy prints to stderr
stderr=subprocess.DEVNULL
)
cls.faked_container_patch.start()
@classmethod
def tearDownClass(cls): # noqa
Andreas Klöckner
committed
super(SubprocessRunpyContainerMixin, cls).tearDownClass()
cls.faked_container_patch.stop()
cls.faked_container_process.kill()
def improperly_configured_cache_patch():
# can be used as context manager or decorator
if six.PY3:
built_in_import_path = "builtins.__import__"
import builtins # noqa
else:
built_in_import_path = "__builtin__.__import__"
import __builtin__ as builtins # noqa
built_in_import = builtins.__import__
def my_disable_cache_import(name, globals=None, locals=None, fromlist=(),
level=0):
if name == "django.core.cache":
raise ImproperlyConfigured()
return built_in_import(name, globals, locals, fromlist, level)
return mock.patch(built_in_import_path, side_effect=my_disable_cache_import)
# vim: fdm=marker