Merge pull request #36102 from openedx/feanil/remove_stubs

test: Drop some stub code.
This commit is contained in:
Feanil Patel
2025-01-15 10:33:25 -05:00
committed by GitHub
23 changed files with 2 additions and 2720 deletions

View File

@@ -1,60 +0,0 @@
"""
Stub implementation of catalog service for acceptance tests
"""
# pylint: disable=invalid-name
import re
import six.moves.urllib.parse
from .http import StubHttpRequestHandler, StubHttpService
class StubCatalogServiceHandler(StubHttpRequestHandler): # lint-amnesty, pylint: disable=missing-class-docstring
def do_GET(self): # lint-amnesty, pylint: disable=missing-function-docstring
pattern_handlers = {
r'/api/v1/programs/$': self.program_list,
r'/api/v1/programs/([0-9a-f-]+)/$': self.program_detail,
r'/api/v1/program_types/$': self.program_types,
r'/api/v1/pathways/$': self.pathways
}
if self.match_pattern(pattern_handlers):
return
self.send_response(404, content='404 Not Found')
def match_pattern(self, pattern_handlers):
"""
Find the correct handler method given the path info from the HTTP request.
"""
path = six.moves.urllib.parse.urlparse(self.path).path
for pattern, handler in pattern_handlers.items():
match = re.match(pattern, path)
if match:
handler(*match.groups())
return True
def program_list(self):
"""Stub the catalog's program list endpoint."""
programs = self.server.config.get('catalog.programs', [])
self.send_json_response(programs)
def program_detail(self, program_uuid):
"""Stub the catalog's program detail endpoint."""
program = self.server.config.get('catalog.programs.' + program_uuid)
self.send_json_response(program)
def program_types(self):
program_types = self.server.config.get('catalog.programs_types', [])
self.send_json_response(program_types)
def pathways(self):
pathways = self.server.config.get('catalog.pathways', [])
self.send_json_response(pathways)
class StubCatalogService(StubHttpService):
HANDLER_CLASS = StubCatalogServiceHandler

View File

@@ -1,145 +0,0 @@
"""
Stub implementation of cs_comments_service for acceptance tests
"""
import re
from collections import OrderedDict
import six.moves.urllib.parse
from .http import StubHttpRequestHandler, StubHttpService
class StubCommentsServiceHandler(StubHttpRequestHandler): # lint-amnesty, pylint: disable=missing-class-docstring
@property
def _params(self):
return six.moves.urllib.parse.parse_qs(six.moves.urllib.parse.urlparse(self.path).query)
def do_GET(self): # lint-amnesty, pylint: disable=missing-function-docstring
pattern_handlers = OrderedDict([
("/api/v1/users/(?P<user_id>\\d+)/active_threads$", self.do_user_profile),
("/api/v1/users/(?P<user_id>\\d+)$", self.do_user),
("/api/v1/search/threads$", self.do_search_threads),
("/api/v1/threads$", self.do_threads),
("/api/v1/threads/(?P<thread_id>\\w+)$", self.do_thread),
("/api/v1/comments/(?P<comment_id>\\w+)$", self.do_comment),
("/api/v1/(?P<commentable_id>\\w+)/threads$", self.do_commentable),
])
if self.match_pattern(pattern_handlers):
return
self.send_response(404, content="404 Not Found")
def match_pattern(self, pattern_handlers): # lint-amnesty, pylint: disable=missing-function-docstring
path = six.moves.urllib.parse.urlparse(self.path).path
for pattern in pattern_handlers:
match = re.match(pattern, path)
if match:
pattern_handlers[pattern](**match.groupdict())
return True
return None
def do_PUT(self):
if self.path.startswith('/set_config'):
return StubHttpRequestHandler.do_PUT(self)
pattern_handlers = {
"/api/v1/users/(?P<user_id>\\d+)$": self.do_put_user,
}
if self.match_pattern(pattern_handlers):
return
self.send_response(204, "")
def do_put_user(self, user_id): # lint-amnesty, pylint: disable=unused-argument
self.server.config['default_sort_key'] = self.post_dict.get("default_sort_key", "date")
self.send_json_response({'username': self.post_dict.get("username"), 'external_id': self.post_dict.get("external_id")}) # lint-amnesty, pylint: disable=line-too-long
def do_DELETE(self): # lint-amnesty, pylint: disable=missing-function-docstring
pattern_handlers = {
"/api/v1/comments/(?P<comment_id>\\w+)$": self.do_delete_comment
}
if self.match_pattern(pattern_handlers):
return
self.send_json_response({})
def do_user(self, user_id): # lint-amnesty, pylint: disable=missing-function-docstring
response = {
"id": user_id,
"default_sort_key": self.server.config.get("default_sort_key", "date"),
"upvoted_ids": [],
"downvoted_ids": [],
"subscribed_thread_ids": [],
}
if 'course_id' in self._params:
response.update({
"threads_count": 1,
"comments_count": 2
})
self.send_json_response(response)
def do_user_profile(self, user_id): # lint-amnesty, pylint: disable=missing-function-docstring, unused-argument
if 'active_threads' in self.server.config:
user_threads = self.server.config['active_threads'][:]
params = self._params
page = int(params.get("page", ["1"])[0])
per_page = int(params.get("per_page", ["20"])[0])
num_pages = max(len(user_threads) - 1, 1) / per_page + 1
user_threads = user_threads[(page - 1) * per_page:page * per_page]
self.send_json_response({
"collection": user_threads,
"page": page,
"num_pages": num_pages
})
else:
self.send_response(404, content="404 Not Found")
def do_thread(self, thread_id): # lint-amnesty, pylint: disable=missing-function-docstring
if thread_id in self.server.config.get('threads', {}):
thread = self.server.config['threads'][thread_id].copy()
params = six.moves.urllib.parse.parse_qs(six.moves.urllib.parse.urlparse(self.path).query)
if "recursive" in params and params["recursive"][0] == "True":
thread.setdefault('children', [])
resp_total = thread.setdefault('resp_total', len(thread['children'])) # lint-amnesty, pylint: disable=unused-variable
resp_skip = int(params.get("resp_skip", ["0"])[0])
resp_limit = int(params.get("resp_limit", ["10000"])[0])
thread['children'] = thread['children'][resp_skip:(resp_skip + resp_limit)]
self.send_json_response(thread)
else:
self.send_response(404, content="404 Not Found")
def do_threads(self):
threads = self.server.config.get('threads', {})
threads_data = list(threads.values())
self.send_json_response({"collection": threads_data, "page": 1, "num_pages": 1})
def do_search_threads(self):
self.send_json_response(self.server.config.get('search_result', {}))
def do_comment(self, comment_id):
# django_comment_client calls GET comment before doing a DELETE, so that's what this is here to support.
if comment_id in self.server.config.get('comments', {}):
comment = self.server.config['comments'][comment_id]
self.send_json_response(comment)
def do_delete_comment(self, comment_id):
"""Handle comment deletion. Returns a JSON representation of the
deleted comment."""
if comment_id in self.server.config.get('comments', {}):
comment = self.server.config['comments'][comment_id]
self.send_json_response(comment)
def do_commentable(self, commentable_id):
self.send_json_response({
"collection": [
thread
for thread in self.server.config.get('threads', {}).values()
if thread.get('commentable_id') == commentable_id
],
"page": 1,
"num_pages": 1,
})
class StubCommentsService(StubHttpService):
HANDLER_CLASS = StubCommentsServiceHandler

View File

@@ -1 +0,0 @@
<rubric><category><description>Writing Applications</description><score>0</score><option points='0'> The essay loses focus, has little information or supporting details, and the organization makes it difficult to follow.</option><option points='1'> The essay presents a mostly unified theme, includes sufficient information to convey the theme, and is generally organized well.</option></category><category><description> Language Conventions </description><score>1</score><option points='0'> The essay demonstrates a reasonable command of proper spelling and grammar. </option><option points='1'> The essay demonstrates superior command of proper spelling and grammar.</option></category></rubric>

View File

@@ -1 +0,0 @@
<rubric><category><description>Writing Applications</description><option points='0'> The essay loses focus, has little information or supporting details, and the organization makes it difficult to follow.</option><option points='1'> The essay presents a mostly unified theme, includes sufficient information to convey the theme, and is generally organized well.</option></category><category><description> Language Conventions </description><option points='0'> The essay demonstrates a reasonable command of proper spelling and grammar. </option><option points='1'> The essay demonstrates superior command of proper spelling and grammar.</option></category></rubric>

View File

@@ -1,64 +0,0 @@
"""
Stub implementation of ecommerce service for acceptance tests
"""
import re
import six.moves.urllib.parse
from .http import StubHttpRequestHandler, StubHttpService
class StubEcommerceServiceHandler(StubHttpRequestHandler): # pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring
def do_GET(self):
pattern_handlers = {
'/api/v2/orders/$': self.get_orders_list,
}
if self.match_pattern(pattern_handlers):
return
self.send_response(404, content='404 Not Found')
def match_pattern(self, pattern_handlers):
"""
Find the correct handler method given the path info from the HTTP request.
"""
path = six.moves.urllib.parse.urlparse(self.path).path
for pattern in pattern_handlers:
match = re.match(pattern, path)
if match:
pattern_handlers[pattern](**match.groupdict())
return True
return None
def get_orders_list(self):
"""
Stubs the orders list endpoint.
"""
orders = {
'results': [
{
'status': 'Complete',
'number': 'Edx-123',
'total_excl_tax': '100.00',
'date_placed': '2016-04-21T23:14:23Z',
'lines': [
{
'title': 'Test Course',
'line_price_excl_tax': '100.00',
'product': {
'product_class': 'Seat'
}
}
],
}
]
}
orders = self.server.config.get('orders', orders)
self.send_json_response(orders)
class StubEcommerceService(StubHttpService):
HANDLER_CLASS = StubEcommerceServiceHandler

View File

@@ -1,395 +0,0 @@
"""
Stub implementation of EdxNotes for acceptance tests
"""
import json
import re
from copy import deepcopy
from datetime import datetime
from math import ceil
from uuid import uuid4
from urllib.parse import urlencode
from .http import StubHttpRequestHandler, StubHttpService
class StubEdxNotesServiceHandler(StubHttpRequestHandler):
"""
Handler for EdxNotes requests.
"""
URL_HANDLERS = {
"GET": {
"/api/v1/annotations$": "_collection",
"/api/v1/annotations/(?P<note_id>[0-9A-Fa-f]+)$": "_read",
"/api/v1/search$": "_search",
},
"POST": {
"/api/v1/annotations$": "_create",
"/create_notes": "_create_notes",
},
"PUT": {
"/api/v1/annotations/(?P<note_id>[0-9A-Fa-f]+)$": "_update",
"/cleanup$": "_cleanup",
},
"DELETE": {
"/api/v1/annotations/(?P<note_id>[0-9A-Fa-f]+)$": "_delete",
},
}
def _match_pattern(self, pattern_handlers):
"""
Finds handler by the provided handler patterns and delegate response to
the matched handler.
"""
for pattern in pattern_handlers:
match = re.match(pattern, self.path_only)
if match:
handler = getattr(self, pattern_handlers[pattern], None)
if handler:
handler(**match.groupdict())
return True
return None
def _send_handler_response(self, method):
"""
Delegate response to handler methods.
If no handler defined, send a 404 response.
"""
# Choose the list of handlers based on the HTTP method
if method in self.URL_HANDLERS:
handlers_list = self.URL_HANDLERS[method]
else:
self.log_error(f"Unrecognized method '{method}'")
return
# Check the path (without querystring params) against our list of handlers
if self._match_pattern(handlers_list):
return
# If we don't have a handler for this URL and/or HTTP method,
# respond with a 404.
else:
self.send_response(404, content="404 Not Found")
def do_GET(self):
"""
Handle GET methods to the EdxNotes API stub.
"""
self._send_handler_response("GET")
def do_POST(self):
"""
Handle POST methods to the EdxNotes API stub.
"""
self._send_handler_response("POST")
def do_PUT(self):
"""
Handle PUT methods to the EdxNotes API stub.
"""
if self.path.startswith("/set_config"):
return StubHttpRequestHandler.do_PUT(self)
self._send_handler_response("PUT")
def do_DELETE(self):
"""
Handle DELETE methods to the EdxNotes API stub.
"""
self._send_handler_response("DELETE")
def do_OPTIONS(self):
"""
Handle OPTIONS methods to the EdxNotes API stub.
"""
self.send_response(200, headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Content-Length, Content-Type, X-Annotator-Auth-Token, X-Requested-With, X-Annotator-Auth-Token, X-Requested-With, X-CSRFToken", # lint-amnesty, pylint: disable=line-too-long
})
def respond(self, status_code=200, content=None):
"""
Send a response back to the client with the HTTP `status_code` (int),
the given content serialized as JSON (str), and the headers set appropriately.
"""
headers = {
"Access-Control-Allow-Origin": "*",
}
if status_code < 400 and content:
headers["Content-Type"] = "application/json"
content = json.dumps(content).encode('utf-8')
else:
headers["Content-Type"] = "text/html"
self.send_response(status_code, content, headers)
def _create(self):
"""
Create a note, assign id, annotator_schema_version, created and updated dates.
"""
note = json.loads(self.request_content.decode('utf-8'))
note.update({
"id": uuid4().hex,
"annotator_schema_version": "v1.0",
"created": datetime.utcnow().isoformat(),
"updated": datetime.utcnow().isoformat(),
})
self.server.add_notes(note)
self.respond(content=note)
def _create_notes(self):
"""
The same as self._create, but it works a list of notes.
"""
try:
notes = json.loads(self.request_content.decode('utf-8'))
except ValueError:
self.respond(400, "Bad Request")
return
if not isinstance(notes, list):
self.respond(400, "Bad Request")
return
for note in notes:
note.update({
"id": uuid4().hex,
"annotator_schema_version": "v1.0",
"created": note["created"] if note.get("created") else datetime.utcnow().isoformat(),
"updated": note["updated"] if note.get("updated") else datetime.utcnow().isoformat(),
})
self.server.add_notes(note)
self.respond(content=notes)
def _read(self, note_id):
"""
Return the note by note id.
"""
notes = self.server.get_all_notes()
result = self.server.filter_by_id(notes, note_id)
if result:
self.respond(content=result[0])
else:
self.respond(404, "404 Not Found")
def _update(self, note_id):
"""
Update the note by note id.
"""
note = self.server.update_note(note_id, json.loads(self.request_content.decode('utf-8')))
if note:
self.respond(content=note)
else:
self.respond(404, "404 Not Found")
def _delete(self, note_id):
"""
Delete the note by note id.
"""
if self.server.delete_note(note_id):
self.respond(204, "No Content")
else:
self.respond(404, "404 Not Found")
@staticmethod
def _get_next_prev_url(url_path, query_params, page_num, page_size):
"""
makes url with the query params including pagination params
for pagination next and previous urls
"""
query_params = deepcopy(query_params)
query_params.update({
"page": page_num,
"page_size": page_size
})
return url_path + "?" + urlencode(query_params)
def _get_paginated_response(self, notes, page_num, page_size):
"""
Returns a paginated response of notes.
"""
start = (page_num - 1) * page_size
end = start + page_size
total_notes = len(notes)
url_path = "http://{server_address}:{port}{path}".format(
server_address=self.client_address[0],
port=self.server.port,
path=self.path_only
)
next_url = None if end >= total_notes else self._get_next_prev_url(
url_path, self.get_params, page_num + 1, page_size
)
prev_url = None if page_num == 1 else self._get_next_prev_url(
url_path, self.get_params, page_num - 1, page_size)
# Get notes from range
notes = deepcopy(notes[start:end])
paginated_response = {
'total': total_notes,
'num_pages': int(ceil(float(total_notes) / page_size)),
'current_page': page_num,
'rows': notes,
'next': next_url,
'start': start,
'previous': prev_url
}
return paginated_response
def _search(self):
"""
Search for a notes by user id, course_id and usage_id.
"""
search_with_usage_id = False
user = self.get_params.get("user", None)
usage_ids = self.get_params.get("usage_id", [])
course_id = self.get_params.get("course_id", None)
text = self.get_params.get("text", None)
page = int(self.get_params.get("page", 1))
page_size = int(self.get_params.get("page_size", 2))
if user is None:
self.respond(400, "Bad Request")
return
notes = self.server.get_all_notes()
if course_id is not None:
notes = self.server.filter_by_course_id(notes, course_id)
if len(usage_ids) > 0:
search_with_usage_id = True
notes = self.server.filter_by_usage_id(notes, usage_ids)
if text:
notes = self.server.search(notes, text)
if not search_with_usage_id:
notes = self._get_paginated_response(notes, page, page_size)
self.respond(content=notes)
def _collection(self):
"""
Return all notes for the user.
"""
user = self.get_params.get("user", None)
page = int(self.get_params.get("page", 1))
page_size = int(self.get_params.get("page_size", 2))
notes = self.server.get_all_notes()
if user is None:
self.send_response(400, content="Bad Request")
return
notes = self._get_paginated_response(notes, page, page_size)
self.respond(content=notes)
def _cleanup(self):
"""
Helper method that removes all notes to the stub EdxNotes service.
"""
self.server.cleanup()
self.respond()
class StubEdxNotesService(StubHttpService):
"""
Stub EdxNotes service.
"""
HANDLER_CLASS = StubEdxNotesServiceHandler
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.notes = []
def get_all_notes(self):
"""
Returns a list of all notes without pagination
"""
notes = deepcopy(self.notes)
notes.reverse()
return notes
def add_notes(self, notes):
"""
Adds `notes(list)` to the stub EdxNotes service.
"""
if not isinstance(notes, list):
notes = [notes]
for note in notes:
self.notes.append(note)
def update_note(self, note_id, note_info):
"""
Updates the note with `note_id(str)` by the `note_info(dict)` to the
stub EdxNotes service.
"""
note = self.filter_by_id(self.notes, note_id)
if note:
note[0].update(note_info)
return note
else:
return None
def delete_note(self, note_id):
"""
Removes the note with `note_id(str)` to the stub EdxNotes service.
"""
note = self.filter_by_id(self.notes, note_id)
if note:
index = self.notes.index(note[0])
self.notes.pop(index)
return True
else:
return False
def cleanup(self):
"""
Removes all notes to the stub EdxNotes service.
"""
self.notes = []
def filter_by_id(self, data, note_id):
"""
Filters provided `data(list)` by the `note_id(str)`.
"""
return self.filter_by(data, "id", note_id)
def filter_by_user(self, data, user):
"""
Filters provided `data(list)` by the `user(str)`.
"""
return self.filter_by(data, "user", user)
def filter_by_usage_id(self, data, usage_ids):
"""
Filters provided `data(list)` by the `usage_id(str)`.
"""
if not isinstance(usage_ids, list):
usage_ids = [usage_ids]
return self.filter_by_list(data, "usage_id", usage_ids)
def filter_by_course_id(self, data, course_id):
"""
Filters provided `data(list)` by the `course_id(str)`.
"""
return self.filter_by(data, "course_id", course_id)
def filter_by(self, data, field_name, value):
"""
Filters provided `data(list)` by the `field_name(str)` with `value`.
"""
return [note for note in data if note.get(field_name) == value]
def filter_by_list(self, data, field_name, values):
"""
Filters provided `data(list)` by the `field_name(str)` in values.
"""
return [note for note in data if note.get(field_name) in values]
def search(self, data, query):
"""
Search the `query(str)` text in the provided `data(list)`.
"""
return [note for note in data if str(query).strip() in note.get("text", "").split()]

View File

@@ -1,281 +0,0 @@
"""
Stub implementation of an HTTP service.
"""
import json
import threading
from functools import wraps
from logging import getLogger
import six
from lazy import lazy
from six.moves.BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from six.moves.socketserver import ThreadingMixIn
LOGGER = getLogger(__name__)
def require_params(method, *required_keys):
"""
Decorator to ensure that the method has all the required parameters.
Example:
@require_params('GET', 'id', 'state')
def handle_request(self):
# ....
would send a 400 response if no GET parameters were specified
for 'id' or 'state' (or if those parameters had empty values).
The wrapped function should be a method of a `StubHttpRequestHandler`
subclass.
Currently, "GET" and "POST" are the only supported methods.
"""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# Read either GET querystring params or POST dict params
if method == "GET":
params = self.get_params
elif method == "POST":
params = self.post_dict
else:
raise ValueError(f"Unsupported method '{method}'")
# Check for required values
missing = []
for key in required_keys:
if params.get(key) is None:
missing.append(key)
if len(missing) > 0:
msg = "Missing required key(s) {keys}".format(keys=",".join(missing))
self.send_response(400, content=msg, headers={'Content-type': 'text/plain'})
# If nothing is missing, execute the function as usual
else:
return func(self, *args, **kwargs)
return wrapper
return decorator
class StubHttpRequestHandler(BaseHTTPRequestHandler):
"""
Handler for the stub HTTP service.
"""
protocol = "HTTP/1.0"
def log_message(self, format_str, *args):
"""
Redirect messages to keep the test console clean.
"""
LOGGER.debug(self._format_msg(format_str, *args))
def log_error(self, format_str, *args):
"""
Helper to log a server error.
"""
LOGGER.error(self._format_msg(format_str, *args))
@lazy
def request_content(self):
"""
Retrieve the content of the request.
"""
try:
length = int(self.headers.get('content-length'))
except (TypeError, ValueError):
return ""
else:
return self.rfile.read(length)
@lazy
def post_dict(self):
"""
Retrieve the request POST parameters from the client as a dictionary.
If no POST parameters can be interpreted, return an empty dict.
"""
if isinstance(self.request_content, bytes):
contents = self.request_content.decode('utf-8')
else:
contents = self.request_content
# The POST dict will contain a list of values for each key.
# None of our parameters are lists, however, so we map [val] --> val
# If the list contains multiple entries, we pick the first one
try:
post_dict = six.moves.urllib.parse.parse_qs(contents, keep_blank_values=True)
return {
key: list_val[0]
for key, list_val in post_dict.items()
}
except: # lint-amnesty, pylint: disable=bare-except
return {}
@lazy
def get_params(self):
"""
Return the GET parameters (querystring in the URL).
"""
query = six.moves.urllib.parse.urlparse(self.path).query
# By default, `parse_qs` returns a list of values for each param
# For convenience, we replace lists of 1 element with just the element
return {
key: value[0] if len(value) == 1 else value
for key, value in six.moves.urllib.parse.parse_qs(query).items()
}
@lazy
def path_only(self):
"""
Return the URL path without GET parameters.
Removes the trailing slash if there is one.
"""
path = six.moves.urllib.parse.urlparse(self.path).path
if path.endswith('/'):
return path[:-1]
else:
return path
def do_PUT(self):
"""
Allow callers to configure the stub server using the /set_config URL.
The request should have POST data, such that:
Each POST parameter is the configuration key.
Each POST value is a JSON-encoded string value for the configuration.
"""
if self.path in ("/set_config", "/set_config/"):
if len(self.post_dict) > 0:
for key, value in self.post_dict.items():
self.log_message(f"Set config '{key}' to '{value}'")
try:
value = json.loads(value)
except ValueError:
self.log_message(f"Could not parse JSON: {value}")
self.send_response(400)
else:
self.server.config[key] = value
self.send_response(200)
# No parameters sent to configure, so return success by default
else:
self.send_response(200)
else:
self.send_response(404)
def send_response(self, status_code, content=None, headers=None):
"""
Send a response back to the client with the HTTP `status_code` (int),
`content` (str) and `headers` (dict).
"""
self.log_message(
f"Sent HTTP response: {status_code} with content '{content}' and headers {headers}"
)
if headers is None:
headers = {
'Access-Control-Allow-Origin': "*",
}
BaseHTTPRequestHandler.send_response(self, status_code)
for (key, value) in headers.items():
self.send_header(key, value)
if len(headers) > 0:
self.end_headers()
if content is not None:
if isinstance(content, str):
content = content.encode('utf-8')
self.wfile.write(content)
def send_json_response(self, content):
"""
Send a response with status code 200, the given content serialized as
JSON, and the Content-Type header set appropriately
"""
self.send_response(200, json.dumps(content), {"Content-Type": "application/json"})
def _format_msg(self, format_str, *args):
"""
Format message for logging.
`format_str` is a string with old-style Python format escaping;
`args` is an array of values to fill into the string.
"""
if not args:
format_str = six.moves.urllib.parse.unquote(format_str)
return "{} - - [{}] {}\n".format(
self.client_address[0],
self.log_date_time_string(),
format_str % args
)
def do_HEAD(self):
"""
Respond to an HTTP HEAD request
"""
self.send_response(200)
class StubHttpService(ThreadingMixIn, HTTPServer):
"""
Stub HTTP service implementation.
"""
# Subclasses override this to provide the handler class to use.
# Should be a subclass of `StubHttpRequestHandler`
HANDLER_CLASS = StubHttpRequestHandler
def __init__(self, port_num=0):
"""
Configure the server to listen on localhost.
Default is to choose an arbitrary open port.
"""
address = ('0.0.0.0', port_num)
HTTPServer.__init__(self, address, self.HANDLER_CLASS)
# Create a dict to store configuration values set by the client
self.config = {}
# Start the server in a separate thread
server_thread = threading.Thread(target=self.serve_forever)
server_thread.daemon = True
server_thread.start()
# Log the port we're using to help identify port conflict errors
LOGGER.debug(f'Starting service on port {self.port}')
def shutdown(self):
"""
Stop the server and free up the port
"""
# First call superclass shutdown()
HTTPServer.shutdown(self)
# We also need to manually close the socket
self.socket.close()
@property
def port(self):
"""
Return the port that the service is listening on.
"""
_, port = self.server_address
return port

View File

@@ -1,317 +0,0 @@
"""
Stub implementation of LTI Provider.
What is supported:
------------------
1.) This LTI Provider can service only one Tool Consumer at the same time. It is
not possible to have this LTI multiple times on a single page in LMS.
"""
import base64
import hashlib
import logging
import textwrap
from unittest import mock
from uuid import uuid4
import oauthlib.oauth1
import requests
import six
from oauthlib.oauth1.rfc5849 import parameters, signature
from openedx.core.djangolib.markup import HTML
from .http import StubHttpRequestHandler, StubHttpService
log = logging.getLogger(__name__)
class StubLtiHandler(StubHttpRequestHandler):
"""
A handler for LTI POST and GET requests.
"""
DEFAULT_CLIENT_KEY = 'test_client_key'
DEFAULT_CLIENT_SECRET = 'test_client_secret'
DEFAULT_LTI_ENDPOINT = 'correct_lti_endpoint'
DEFAULT_LTI_ADDRESS = 'http://{host}:{port}/'
def do_GET(self):
"""
Handle a GET request from the client and sends response back.
Used for checking LTI Provider started correctly.
"""
self.send_response(200, 'This is LTI Provider.', {'Content-type': 'text/plain'})
def do_POST(self):
"""
Handle a POST request from the client and sends response back.
"""
if 'grade' in self.path and self._send_graded_result().status_code == 200:
status_message = HTML('LTI consumer (edX) responded with XML content:<br>{grade_data}').format(
grade_data=self.server.grade_data['TC answer']
)
content = self._create_content(status_message)
self.send_response(200, content)
elif 'lti2_outcome' in self.path and self._send_lti2_outcome().status_code == 200:
status_message = HTML('LTI consumer (edX) responded with HTTP {}<br>').format(
self.server.grade_data['status_code'])
content = self._create_content(status_message)
self.send_response(200, content)
elif 'lti2_delete' in self.path and self._send_lti2_delete().status_code == 200:
status_message = HTML('LTI consumer (edX) responded with HTTP {}<br>').format(
self.server.grade_data['status_code'])
content = self._create_content(status_message)
self.send_response(200, content)
# Respond to request with correct lti endpoint
elif self._is_correct_lti_request():
params = {k: v for k, v in self.post_dict.items() if k != 'oauth_signature'}
if self._check_oauth_signature(params, self.post_dict.get('oauth_signature', "")):
status_message = "This is LTI tool. Success."
# Set data for grades what need to be stored as server data
if 'lis_outcome_service_url' in self.post_dict:
self.server.grade_data = {
'callback_url': self.post_dict.get('lis_outcome_service_url').replace('https', 'http'),
'sourcedId': self.post_dict.get('lis_result_sourcedid')
}
host = self.server.server_address[0]
submit_url = f'//{host}:{self.server.server_address[1]}'
content = self._create_content(status_message, submit_url)
self.send_response(200, content)
else:
content = self._create_content("Wrong LTI signature")
self.send_response(200, content)
else:
content = self._create_content("Invalid request URL")
self.send_response(500, content)
def _send_graded_result(self):
"""
Send grade request.
"""
values = {
'textString': 0.5,
'sourcedId': self.server.grade_data['sourcedId'],
'imsx_messageIdentifier': uuid4().hex,
}
payload = textwrap.dedent("""
<?xml version = "1.0" encoding = "UTF-8"?>
<imsx_POXEnvelopeRequest xmlns="http://www.imsglobal.org/services/ltiv1p1/xsd/imsoms_v1p0">
<imsx_POXHeader>
<imsx_POXRequestHeaderInfo>
<imsx_version>V1.0</imsx_version>
<imsx_messageIdentifier>{imsx_messageIdentifier}</imsx_messageIdentifier> /
</imsx_POXRequestHeaderInfo>
</imsx_POXHeader>
<imsx_POXBody>
<replaceResultRequest>
<resultRecord>
<sourcedGUID>
<sourcedId>{sourcedId}</sourcedId>
</sourcedGUID>
<result>
<resultScore>
<language>en-us</language>
<textString>{textString}</textString>
</resultScore>
</result>
</resultRecord>
</replaceResultRequest>
</imsx_POXBody>
</imsx_POXEnvelopeRequest>
""")
data = payload.format(**values)
url = self.server.grade_data['callback_url']
headers = {
'Content-Type': 'application/xml',
'X-Requested-With': 'XMLHttpRequest',
'Authorization': self._oauth_sign(url, data)
}
# Send request ignoring verifirecation of SSL certificate
response = requests.post(url, data=data, headers=headers, verify=False)
self.server.grade_data['TC answer'] = response.content
return response
def _send_lti2_outcome(self):
"""
Send a grade back to consumer
"""
payload = textwrap.dedent("""
{{
"@context" : "http://purl.imsglobal.org/ctx/lis/v2/Result",
"@type" : "Result",
"resultScore" : {score},
"comment" : "This is awesome."
}}
""")
data = payload.format(score=0.8)
return self._send_lti2(data)
def _send_lti2_delete(self):
"""
Send a delete back to consumer
"""
payload = textwrap.dedent("""
{
"@context" : "http://purl.imsglobal.org/ctx/lis/v2/Result",
"@type" : "Result"
}
""")
return self._send_lti2(payload)
def _send_lti2(self, payload):
"""
Send lti2 json result service request.
"""
### We compute the LTI V2.0 service endpoint from the callback_url (which is set by the launch call)
url = self.server.grade_data['callback_url']
url_parts = url.split('/')
url_parts[-1] = "lti_2_0_result_rest_handler"
anon_id = self.server.grade_data['sourcedId'].split(":")[-1]
url_parts.extend(["user", anon_id])
new_url = '/'.join(url_parts)
content_type = 'application/vnd.ims.lis.v2.result+json'
headers = {
'Content-Type': content_type,
'Authorization': self._oauth_sign(new_url, payload,
method='PUT',
content_type=content_type)
}
# Send request ignoring verifirecation of SSL certificate
response = requests.put(new_url, data=payload, headers=headers, verify=False)
self.server.grade_data['status_code'] = response.status_code
self.server.grade_data['TC answer'] = response.content
return response
def _create_content(self, response_text, submit_url=None):
"""
Return content (str) either for launch, send grade or get result from TC.
"""
if submit_url:
submit_form = textwrap.dedent(HTML("""
<form action="{submit_url}/grade" method="post">
<input type="submit" name="submit-button" value="Submit" id="submit-button">
</form>
<form action="{submit_url}/lti2_outcome" method="post">
<input type="submit" name="submit-lti2-button" value="Submit" id="submit-lti2-button">
</form>
<form action="{submit_url}/lti2_delete" method="post">
<input type="submit" name="submit-lti2-delete-button" value="Submit" id="submit-lti-delete-button">
</form>
""")).format(submit_url=submit_url)
else:
submit_form = ''
# Show roles only for LTI launch.
if self.post_dict.get('roles'):
role = HTML('<h5>Role: {}</h5>').format(self.post_dict['roles'])
else:
role = ''
response_str = textwrap.dedent(HTML("""
<html>
<head>
<title>TEST TITLE</title>
</head>
<body>
<div>
<h2>IFrame loaded</h2>
<h3>Server response is:</h3>
<h3 class="result">{response}</h3>
{role}
</div>
{submit_form}
</body>
</html>
""")).format(response=response_text, role=role, submit_form=submit_form)
# Currently LTI block doublequotes the lis_result_sourcedid parameter.
# Unquote response two times.
return six.moves.urllib.parse.unquote(six.moves.urllib.parse.unquote(response_str))
def _is_correct_lti_request(self):
"""
Return a boolean indicating whether the URL path is a valid LTI end-point.
"""
lti_endpoint = self.server.config.get('lti_endpoint', self.DEFAULT_LTI_ENDPOINT)
return lti_endpoint in self.path
def _oauth_sign(self, url, body, content_type='application/x-www-form-urlencoded', method='POST'):
"""
Signs request and returns signed Authorization header.
"""
client_key = self.server.config.get('client_key', self.DEFAULT_CLIENT_KEY)
client_secret = self.server.config.get('client_secret', self.DEFAULT_CLIENT_SECRET)
client = oauthlib.oauth1.Client(
client_key=str(client_key),
client_secret=str(client_secret)
)
headers = {
# This is needed for body encoding:
'Content-Type': content_type,
}
# Calculate and encode body hash. See http://oauth.googlecode.com/svn/spec/ext/body_hash/1.0/oauth-bodyhash.html
sha1 = hashlib.sha1()
sha1.update(body.encode('utf-8'))
oauth_body_hash = base64.b64encode(sha1.digest()).decode('utf-8')
mock_request = mock.Mock(
uri=str(six.moves.urllib.parse.unquote(url)),
headers=headers,
body="",
decoded_body="",
http_method=str(method),
)
params = client.get_oauth_params(mock_request)
mock_request.oauth_params = params
mock_request.oauth_params.append(('oauth_body_hash', oauth_body_hash))
sig = client.get_oauth_signature(mock_request)
mock_request.oauth_params.append(('oauth_signature', sig))
new_headers = parameters.prepare_headers(mock_request.oauth_params, headers, realm=None)
return new_headers['Authorization']
def _check_oauth_signature(self, params, client_signature):
"""
Checks oauth signature from client.
`params` are params from post request except signature,
`client_signature` is signature from request.
Builds mocked request and verifies hmac-sha1 signing::
1. builds string to sign from `params`, `url` and `http_method`.
2. signs it with `client_secret` which comes from server settings.
3. obtains signature after sign and then compares it with request.signature
(request signature comes form client in request)
Returns `True` if signatures are correct, otherwise `False`.
"""
client_secret = str(self.server.config.get('client_secret', self.DEFAULT_CLIENT_SECRET))
host = '127.0.0.1'
port = self.server.server_address[1]
lti_base = self.DEFAULT_LTI_ADDRESS.format(host=host, port=port)
lti_endpoint = self.server.config.get('lti_endpoint', self.DEFAULT_LTI_ENDPOINT)
url = lti_base + lti_endpoint
request = mock.Mock()
request.params = [(str(k), str(v)) for k, v in params.items()]
request.uri = str(url)
request.http_method = 'POST'
request.signature = str(client_signature)
return signature.verify_hmac_sha1(request, client_secret)
class StubLtiService(StubHttpService):
"""
A stub LTI provider server that responds
to POST and GET requests to localhost.
"""
HANDLER_CLASS = StubLtiHandler

View File

@@ -1,109 +0,0 @@
"""
Command-line utility to start a stub service.
"""
import logging
import sys
import time
from .catalog import StubCatalogService
from .comments import StubCommentsService
from .ecommerce import StubEcommerceService
from .edxnotes import StubEdxNotesService
from .lti import StubLtiService
from .video_source import VideoSourceHttpService
from .xqueue import StubXQueueService
from .youtube import StubYouTubeService
USAGE = "USAGE: python -m stubs.start SERVICE_NAME PORT_NUM [CONFIG_KEY=CONFIG_VAL, ...]"
SERVICES = {
'xqueue': StubXQueueService,
'youtube': StubYouTubeService,
'comments': StubCommentsService,
'lti': StubLtiService,
'video': VideoSourceHttpService,
'edxnotes': StubEdxNotesService,
'ecommerce': StubEcommerceService,
'catalog': StubCatalogService,
}
# Log to stdout, including debug messages
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s %(message)s")
def get_args():
"""
Parse arguments, returning tuple of `(service_name, port_num, config_dict)`.
Exits with a message if arguments are invalid.
"""
if len(sys.argv) < 3:
print(USAGE)
sys.exit(1)
service_name = sys.argv[1]
port_num = sys.argv[2]
config_dict = _parse_config_args(sys.argv[3:])
if service_name not in SERVICES:
print("Unrecognized service '{}'. Valid choices are: {}".format(
service_name, ", ".join(list(SERVICES.keys()))))
sys.exit(1)
try:
port_num = int(port_num)
if port_num < 0:
raise ValueError
except ValueError:
print(f"Port '{port_num}' must be a positive integer")
sys.exit(1)
return service_name, port_num, config_dict
def _parse_config_args(args):
"""
Parse stub configuration arguments, which are strings of the form "KEY=VAL".
`args` is a list of arguments from the command line.
Any argument that does not match the "KEY=VAL" format will be logged and skipped.
Returns a dictionary with the configuration keys and values.
"""
config_dict = {}
for config_str in args:
try:
components = config_str.split('=')
if len(components) >= 2:
config_dict[components[0]] = "=".join(components[1:])
except: # lint-amnesty, pylint: disable=bare-except
print(f"Warning: could not interpret config value '{config_str}'")
return config_dict
def main():
"""
Start a server; shut down on keyboard interrupt signal.
"""
service_name, port_num, config_dict = get_args()
print(f"Starting stub service '{service_name}' on port {port_num}...")
server = SERVICES[service_name](port_num=port_num)
server.config.update(config_dict)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping stub service...")
finally:
server.shutdown()
if __name__ == "__main__":
main()

View File

@@ -1,372 +0,0 @@
"""
Unit tests for stub EdxNotes implementation.
"""
import json
import unittest
from uuid import uuid4
import ddt
import requests
import six
from ..edxnotes import StubEdxNotesService
@ddt.ddt
class StubEdxNotesServiceTest(unittest.TestCase):
"""
Test cases for the stub EdxNotes service.
"""
def setUp(self):
"""
Start the stub server.
"""
super().setUp()
self.server = StubEdxNotesService()
dummy_notes = self._get_dummy_notes(count=5)
self.server.add_notes(dummy_notes)
self.addCleanup(self.server.shutdown)
def _get_dummy_notes(self, count=1):
"""
Returns a list of dummy notes.
"""
return [self._get_dummy_note(i) for i in range(count)]
def _get_dummy_note(self, uid=0):
"""
Returns a single dummy note.
"""
nid = uuid4().hex
return {
"id": nid,
"created": "2014-10-31T10:05:00.000000",
"updated": "2014-10-31T10:50:00.101010",
"user": "dummy-user-id",
"usage_id": "dummy-usage-id-" + str(uid),
"course_id": "dummy-course-id",
"text": "dummy note text " + nid,
"quote": "dummy note quote",
"ranges": [
{
"start": "/p[1]",
"end": "/p[1]",
"startOffset": 0,
"endOffset": 10,
}
],
}
def test_note_create(self):
dummy_note = {
"user": "dummy-user-id",
"usage_id": "dummy-usage-id",
"course_id": "dummy-course-id",
"text": "dummy note text",
"quote": "dummy note quote",
"ranges": [
{
"start": "/p[1]",
"end": "/p[1]",
"startOffset": 0,
"endOffset": 10,
}
],
}
response = requests.post(self._get_url("api/v1/annotations"), data=json.dumps(dummy_note))
assert response.ok
response_content = response.json()
assert 'id' in response_content
assert 'created' in response_content
assert 'updated' in response_content
assert 'annotator_schema_version' in response_content
self.assertDictContainsSubset(dummy_note, response_content)
def test_note_read(self):
notes = self._get_notes()
for note in notes:
response = requests.get(self._get_url("api/v1/annotations/" + note["id"]))
assert response.ok
self.assertDictEqual(note, response.json())
response = requests.get(self._get_url("api/v1/annotations/does_not_exist"))
assert response.status_code == 404
def test_note_update(self):
notes = self._get_notes()
for note in notes:
response = requests.get(self._get_url("api/v1/annotations/" + note["id"]))
assert response.ok
self.assertDictEqual(note, response.json())
response = requests.get(self._get_url("api/v1/annotations/does_not_exist"))
assert response.status_code == 404
def test_search(self):
# Without user
response = requests.get(self._get_url("api/v1/search"))
assert response.status_code == 400
# get response with default page and page size
response = requests.get(self._get_url("api/v1/search"), params={
"user": "dummy-user-id",
"course_id": "dummy-course-id",
})
assert response.ok
self._verify_pagination_info(
response=response.json(),
total_notes=5,
num_pages=3,
notes_per_page=2,
start=0,
current_page=1,
next_page=2,
previous_page=None
)
# search notes with text that don't exist
response = requests.get(self._get_url("api/v1/search"), params={
"user": "dummy-user-id",
"course_id": "dummy-course-id",
"text": "world war 2"
})
assert response.ok
self._verify_pagination_info(
response=response.json(),
total_notes=0,
num_pages=0,
notes_per_page=0,
start=0,
current_page=1,
next_page=None,
previous_page=None
)
@ddt.data(
'?usage_id=dummy-usage-id-0',
'?usage_id=dummy-usage-id-0&usage_id=dummy-usage-id-1&dummy-usage-id-2&dummy-usage-id-3&dummy-usage-id-4'
)
def test_search_usage_ids(self, usage_ids):
"""
Test search with usage ids.
"""
url = self._get_url('api/v1/search') + usage_ids
response = requests.get(url, params={
'user': 'dummy-user-id',
'course_id': 'dummy-course-id'
})
assert response.ok
response = response.json()
parsed = six.moves.urllib.parse.urlparse(url)
query_params = six.moves.urllib.parse.parse_qs(parsed.query)
query_params['usage_id'].reverse()
assert len(response) == len(query_params['usage_id'])
for index, usage_id in enumerate(query_params['usage_id']):
assert response[index]['usage_id'] == usage_id
def test_delete(self):
notes = self._get_notes()
response = requests.delete(self._get_url("api/v1/annotations/does_not_exist"))
assert response.status_code == 404
for note in notes:
response = requests.delete(self._get_url("api/v1/annotations/" + note["id"]))
assert response.status_code == 204
remaining_notes = self.server.get_all_notes()
assert note['id'] not in [note['id'] for note in remaining_notes]
assert len(remaining_notes) == 0
def test_update(self):
note = self._get_notes()[0]
response = requests.put(self._get_url("api/v1/annotations/" + note["id"]), data=json.dumps({
"text": "new test text"
}))
assert response.status_code == 200
updated_note = self._get_notes()[0]
assert 'new test text' == updated_note['text']
assert note['id'] == updated_note['id']
self.assertCountEqual(note, updated_note)
response = requests.get(self._get_url("api/v1/annotations/does_not_exist"))
assert response.status_code == 404
# pylint: disable=too-many-arguments
def _verify_pagination_info(
self,
response,
total_notes,
num_pages,
notes_per_page,
current_page,
previous_page,
next_page,
start
):
"""
Verify the pagination information.
Argument:
response: response from api
total_notes: total notes in the response
num_pages: total number of pages in response
notes_per_page: number of notes in the response
current_page: current page number
previous_page: previous page number
next_page: next page number
start: start of the current page
"""
def get_page_value(url):
"""
Return page value extracted from url.
"""
if url is None:
return None
parsed = six.moves.urllib.parse.urlparse(url)
query_params = six.moves.urllib.parse.parse_qs(parsed.query)
page = query_params["page"][0]
return page if page is None else int(page)
assert response['total'] == total_notes
assert response['num_pages'] == num_pages
assert len(response['rows']) == notes_per_page
assert response['current_page'] == current_page
assert get_page_value(response['previous']) == previous_page
assert get_page_value(response['next']) == next_page
assert response['start'] == start
def test_notes_collection(self):
"""
Test paginated response of notes api
"""
# Without user
response = requests.get(self._get_url("api/v1/annotations"))
assert response.status_code == 400
# Without any pagination parameters
response = requests.get(self._get_url("api/v1/annotations"), params={"user": "dummy-user-id"})
assert response.ok
self._verify_pagination_info(
response=response.json(),
total_notes=5,
num_pages=3,
notes_per_page=2,
start=0,
current_page=1,
next_page=2,
previous_page=None
)
# With pagination parameters
response = requests.get(self._get_url("api/v1/annotations"), params={
"user": "dummy-user-id",
"page": 2,
"page_size": 3
})
assert response.ok
self._verify_pagination_info(
response=response.json(),
total_notes=5,
num_pages=2,
notes_per_page=2,
start=3,
current_page=2,
next_page=None,
previous_page=1
)
def test_notes_collection_next_previous_with_one_page(self):
"""
Test next and previous urls of paginated response of notes api
when number of pages are 1
"""
response = requests.get(self._get_url("api/v1/annotations"), params={
"user": "dummy-user-id",
"page_size": 10
})
assert response.ok
self._verify_pagination_info(
response=response.json(),
total_notes=5,
num_pages=1,
notes_per_page=5,
start=0,
current_page=1,
next_page=None,
previous_page=None
)
def test_notes_collection_when_no_notes(self):
"""
Test paginated response of notes api when there's no note present
"""
# Delete all notes
self.test_cleanup()
# Get default page
response = requests.get(self._get_url("api/v1/annotations"), params={"user": "dummy-user-id"})
assert response.ok
self._verify_pagination_info(
response=response.json(),
total_notes=0,
num_pages=0,
notes_per_page=0,
start=0,
current_page=1,
next_page=None,
previous_page=None
)
def test_cleanup(self):
response = requests.put(self._get_url("cleanup"))
assert response.ok
assert len(self.server.get_all_notes()) == 0
def test_create_notes(self):
dummy_notes = self._get_dummy_notes(count=2)
response = requests.post(self._get_url("create_notes"), data=json.dumps(dummy_notes))
assert response.ok
assert len(self._get_notes()) == 7
response = requests.post(self._get_url("create_notes"))
assert response.status_code == 400
def test_headers(self):
note = self._get_notes()[0]
response = requests.get(self._get_url("api/v1/annotations/" + note["id"]))
assert response.ok
assert response.headers.get('access-control-allow-origin') == '*'
response = requests.options(self._get_url("api/v1/annotations/"))
assert response.ok
assert response.headers.get('access-control-allow-origin') == '*'
assert response.headers.get('access-control-allow-methods') == 'GET, POST, PUT, DELETE, OPTIONS'
assert 'X-CSRFToken' in response.headers.get('access-control-allow-headers')
def _get_notes(self):
"""
Return a list of notes from the stub EdxNotes service.
"""
notes = self.server.get_all_notes()
assert len(notes) > 0, 'Notes are empty.'
return notes
def _get_url(self, path):
"""
Construt a URL to the stub EdxNotes service.
"""
return "http://127.0.0.1:{port}/{path}/".format(
port=self.server.port, path=path
)

View File

@@ -1,124 +0,0 @@
"""
Unit tests for stub HTTP server base class.
"""
import json
import unittest
import requests
from common.djangoapps.terrain.stubs.http import StubHttpRequestHandler, StubHttpService, require_params
class StubHttpServiceTest(unittest.TestCase): # lint-amnesty, pylint: disable=missing-class-docstring
def setUp(self):
super().setUp()
self.server = StubHttpService()
self.addCleanup(self.server.shutdown)
self.url = f"http://127.0.0.1:{self.server.port}/set_config"
def test_configure(self):
"""
All HTTP stub servers have an end-point that allows
clients to configure how the server responds.
"""
params = {
'test_str': 'This is only a test',
'test_empty': '',
'test_int': 12345,
'test_float': 123.45,
'test_dict': {
'test_key': 'test_val',
},
'test_empty_dict': {},
'test_unicode': '\u2603 the snowman',
'test_none': None,
'test_boolean': False
}
for key, val in params.items():
# JSON-encode each parameter
post_params = {key: json.dumps(val)}
response = requests.put(self.url, data=post_params)
assert response.status_code == 200
# Check that the expected values were set in the configuration
for key, val in params.items():
assert self.server.config.get(key) == val
def test_bad_json(self):
response = requests.put(self.url, data="{,}")
assert response.status_code == 400
def test_no_post_data(self):
response = requests.put(self.url, data={})
assert response.status_code == 200
def test_unicode_non_json(self):
# Send unicode without json-encoding it
response = requests.put(self.url, data={'test_unicode': '\u2603 the snowman'})
assert response.status_code == 400
def test_unknown_path(self):
response = requests.put(
f"http://127.0.0.1:{self.server.port}/invalid_url",
data="{}"
)
assert response.status_code == 404
class RequireRequestHandler(StubHttpRequestHandler): # lint-amnesty, pylint: disable=missing-class-docstring
@require_params('GET', 'test_param')
def do_GET(self):
self.send_response(200)
@require_params('POST', 'test_param')
def do_POST(self):
self.send_response(200)
class RequireHttpService(StubHttpService):
HANDLER_CLASS = RequireRequestHandler
class RequireParamTest(unittest.TestCase):
"""
Test the decorator for requiring parameters.
"""
def setUp(self):
super().setUp()
self.server = RequireHttpService()
self.addCleanup(self.server.shutdown)
self.url = f"http://127.0.0.1:{self.server.port}"
def test_require_get_param(self):
# Expect success when we provide the required param
response = requests.get(self.url, params={"test_param": 2})
assert response.status_code == 200
# Expect failure when we do not proivde the param
response = requests.get(self.url)
assert response.status_code == 400
# Expect failure when we provide an empty param
response = requests.get(self.url + "?test_param=")
assert response.status_code == 400
def test_require_post_param(self):
# Expect success when we provide the required param
response = requests.post(self.url, data={"test_param": 2})
assert response.status_code == 200
# Expect failure when we do not proivde the param
response = requests.post(self.url)
assert response.status_code == 400
# Expect failure when we provide an empty param
response = requests.post(self.url, data={"test_param": None})
assert response.status_code == 400

View File

@@ -1,98 +0,0 @@
"""
Unit tests for stub LTI implementation.
"""
import unittest
from unittest.mock import Mock, patch
import requests
from urllib.request import urlopen # pylint: disable=wrong-import-order
from common.djangoapps.terrain.stubs.lti import StubLtiService
class StubLtiServiceTest(unittest.TestCase):
"""
A stub of the LTI provider that listens on a local
port and responds with pre-defined grade messages.
Used for lettuce BDD tests in lms/courseware/features/lti.feature
"""
def setUp(self):
super().setUp()
self.server = StubLtiService()
self.uri = f'http://127.0.0.1:{self.server.port}/'
self.launch_uri = self.uri + 'correct_lti_endpoint'
self.addCleanup(self.server.shutdown)
self.payload = {
'user_id': 'default_user_id',
'roles': 'Student',
'oauth_nonce': '',
'oauth_timestamp': '',
'oauth_consumer_key': 'test_client_key',
'lti_version': 'LTI-1p0',
'oauth_signature_method': 'HMAC-SHA1',
'oauth_version': '1.0',
'oauth_signature': '',
'lti_message_type': 'basic-lti-launch-request',
'oauth_callback': 'about:blank',
'launch_presentation_return_url': '',
'lis_outcome_service_url': 'http://localhost:8001/test_callback',
'lis_result_sourcedid': '',
'resource_link_id': '',
}
def test_invalid_request_url(self):
"""
Tests that LTI server processes request with right program path but with wrong header.
"""
self.launch_uri = self.uri + 'wrong_lti_endpoint'
response = requests.post(self.launch_uri, data=self.payload)
assert b'Invalid request URL' in response.content
def test_wrong_signature(self):
"""
Tests that LTI server processes request with right program
path and responses with incorrect signature.
"""
response = requests.post(self.launch_uri, data=self.payload)
assert b'Wrong LTI signature' in response.content
@patch('common.djangoapps.terrain.stubs.lti.signature.verify_hmac_sha1', return_value=True)
def test_success_response_launch_lti(self, check_oauth): # lint-amnesty, pylint: disable=unused-argument
"""
Success lti launch.
"""
response = requests.post(self.launch_uri, data=self.payload)
assert b'This is LTI tool. Success.' in response.content
@patch('common.djangoapps.terrain.stubs.lti.signature.verify_hmac_sha1', return_value=True)
def test_send_graded_result(self, verify_hmac): # pylint: disable=unused-argument
response = requests.post(self.launch_uri, data=self.payload)
assert b'This is LTI tool. Success.' in response.content
grade_uri = self.uri + 'grade'
with patch('common.djangoapps.terrain.stubs.lti.requests.post') as mocked_post:
mocked_post.return_value = Mock(content='Test response', status_code=200)
response = urlopen(grade_uri, data=b'') # lint-amnesty, pylint: disable=consider-using-with
assert b'Test response' in response.read()
@patch('common.djangoapps.terrain.stubs.lti.signature.verify_hmac_sha1', return_value=True)
def test_lti20_outcomes_put(self, verify_hmac): # pylint: disable=unused-argument
response = requests.post(self.launch_uri, data=self.payload)
assert b'This is LTI tool. Success.' in response.content
grade_uri = self.uri + 'lti2_outcome'
with patch('common.djangoapps.terrain.stubs.lti.requests.put') as mocked_put:
mocked_put.return_value = Mock(status_code=200)
response = urlopen(grade_uri, data=b'') # lint-amnesty, pylint: disable=consider-using-with
assert b'LTI consumer (edX) responded with HTTP 200' in response.read()
@patch('common.djangoapps.terrain.stubs.lti.signature.verify_hmac_sha1', return_value=True)
def test_lti20_outcomes_put_like_delete(self, verify_hmac): # pylint: disable=unused-argument
response = requests.post(self.launch_uri, data=self.payload)
assert b'This is LTI tool. Success.' in response.content
grade_uri = self.uri + 'lti2_delete'
with patch('common.djangoapps.terrain.stubs.lti.requests.put') as mocked_put:
mocked_put.return_value = Mock(status_code=200)
response = urlopen(grade_uri, data=b'') # lint-amnesty, pylint: disable=consider-using-with
assert b'LTI consumer (edX) responded with HTTP 200' in response.read()

View File

@@ -1,48 +0,0 @@
"""
Unit tests for Video stub server implementation.
"""
import unittest
import requests
from django.conf import settings
from common.djangoapps.terrain.stubs.video_source import VideoSourceHttpService
HLS_MANIFEST_TEXT = """
#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=264787,RESOLUTION=1280x720
history_264kbit/history_264kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=328415,RESOLUTION=1920x1080
history_328kbit/history_328kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=70750,RESOLUTION=640x360
history_70kbit/history_70kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=148269,RESOLUTION=960x540
history_148kbit/history_148kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=41276,RESOLUTION=640x360
history_41kbit/history_41kbit.m3u8
"""
class StubVideoServiceTest(unittest.TestCase):
"""
Test cases for the video stub service.
"""
def setUp(self):
"""
Start the stub server.
"""
super().setUp()
self.server = VideoSourceHttpService()
self.server.config['root_dir'] = f'{settings.TEST_ROOT}/data/video'
self.addCleanup(self.server.shutdown)
def test_get_hls_manifest(self):
"""
Verify that correct hls manifest is received.
"""
response = requests.get(f"http://127.0.0.1:{self.server.port}/hls/history.m3u8")
assert response.ok
assert response.text == HLS_MANIFEST_TEXT.lstrip()
assert response.headers['Access-Control-Allow-Origin'] == '*'

View File

@@ -1,173 +0,0 @@
"""
Unit tests for stub XQueue implementation.
"""
import ast
import json
import unittest
from unittest import mock
import requests
from ..xqueue import StubXQueueService
class FakeTimer:
"""
Fake timer implementation that executes immediately.
"""
def __init__(self, delay, func): # lint-amnesty, pylint: disable=unused-argument
self.func = func
def start(self):
self.func()
class StubXQueueServiceTest(unittest.TestCase): # lint-amnesty, pylint: disable=missing-class-docstring
def setUp(self):
super().setUp()
self.server = StubXQueueService()
self.url = f"http://127.0.0.1:{self.server.port}/xqueue/submit"
self.addCleanup(self.server.shutdown)
# Patch the timer async calls
patcher = mock.patch('common.djangoapps.terrain.stubs.xqueue.post')
self.post = patcher.start()
self.addCleanup(patcher.stop)
# Patch POST requests
patcher = mock.patch('common.djangoapps.terrain.stubs.xqueue.Timer')
timer = patcher.start()
timer.side_effect = FakeTimer
self.addCleanup(patcher.stop)
def test_grade_request(self):
# Post a submission to the stub XQueue
callback_url = 'http://127.0.0.1:8000/test_callback'
expected_header = self._post_submission(
callback_url, 'test_queuekey', 'test_queue',
json.dumps({
'student_info': 'test',
'grader_payload': 'test',
'student_response': 'test'
})
)
# Check the response we receive
# (Should be the default grading response)
expected_body = json.dumps({'correct': True, 'score': 1, 'msg': '<div></div>'})
self._check_grade_response(callback_url, expected_header, expected_body)
def test_configure_default_response(self):
# Configure the default response for submissions to any queue
response_content = {'test_response': 'test_content'}
self.server.config['default'] = response_content
# Post a submission to the stub XQueue
callback_url = 'http://127.0.0.1:8000/test_callback'
expected_header = self._post_submission(
callback_url, 'test_queuekey', 'test_queue',
json.dumps({
'student_info': 'test',
'grader_payload': 'test',
'student_response': 'test'
})
)
# Check the response we receive
# (Should be the default grading response)
self._check_grade_response(callback_url, expected_header, json.dumps(response_content))
def test_configure_specific_response(self):
# Configure the XQueue stub response to any submission to the test queue
response_content = {'test_response': 'test_content'}
self.server.config['This is only a test.'] = response_content
# Post a submission to the XQueue stub
callback_url = 'http://127.0.0.1:8000/test_callback'
expected_header = self._post_submission(
callback_url, 'test_queuekey', 'test_queue',
json.dumps({'submission': 'This is only a test.'})
)
# Check that we receive the response we configured
self._check_grade_response(callback_url, expected_header, json.dumps(response_content))
def test_multiple_response_matches(self):
# Configure the XQueue stub with two responses that
# match the same submission
self.server.config['test_1'] = {'response': True}
self.server.config['test_2'] = {'response': False}
with mock.patch('common.djangoapps.terrain.stubs.http.LOGGER') as logger:
# Post a submission to the XQueue stub
callback_url = 'http://127.0.0.1:8000/test_callback'
self._post_submission(
callback_url, 'test_queuekey', 'test_queue',
json.dumps({'submission': 'test_1 and test_2'})
)
# Expect that we do NOT receive a response
# and that an error message is logged
assert not self.post.called
assert logger.error.called
def _post_submission(self, callback_url, lms_key, queue_name, xqueue_body): # lint-amnesty, pylint: disable=unused-argument
"""
Post a submission to the stub XQueue implementation.
`callback_url` is the URL at which we expect to receive a grade response
`lms_key` is the authentication key sent in the header
`queue_name` is the name of the queue in which to send put the submission
`xqueue_body` is the content of the submission
Returns the header (a string) we send with the submission, which can
be used to validate the response we receive from the stub.
"""
# Post a submission to the XQueue stub
grade_request = {
'xqueue_header': json.dumps({
'lms_callback_url': callback_url,
'lms_key': 'test_queuekey',
'queue_name': 'test_queue'
}),
'xqueue_body': xqueue_body
}
resp = requests.post(self.url, data=grade_request)
# Expect that the response is success
assert resp.status_code == 200
# Return back the header, so we can authenticate the response we receive
return grade_request['xqueue_header']
def _check_grade_response(self, callback_url, expected_header, expected_body):
"""
Verify that the stub sent a POST request back to us
with the expected data.
`callback_url` is the URL we expect the stub to POST to
`expected_header` is the header (a string) we expect to receive with the grade.
`expected_body` is the content (a string) we expect to receive with the grade.
Raises an `AssertionError` if the check fails.
"""
# Check the response posted back to us
# This is the default response
expected_callback_dict = {
'xqueue_header': expected_header,
'xqueue_body': expected_body,
}
# Check that the POST request was made with the correct params
assert self.post.call_args[1]['data']['xqueue_body'] == expected_callback_dict['xqueue_body']
assert ast.literal_eval(self.post.call_args[1]['data']['xqueue_header']) ==\
ast.literal_eval(expected_callback_dict['xqueue_header'])
assert self.post.call_args[0][0] == callback_url

View File

@@ -1,71 +0,0 @@
"""
Unit test for stub YouTube implementation.
"""
import unittest
import requests
from ..youtube import StubYouTubeService
class StubYouTubeServiceTest(unittest.TestCase): # lint-amnesty, pylint: disable=missing-class-docstring
def setUp(self):
super().setUp()
self.server = StubYouTubeService()
self.url = f"http://127.0.0.1:{self.server.port}/"
self.server.config['time_to_response'] = 0.0
self.addCleanup(self.server.shutdown)
def test_unused_url(self):
response = requests.get(self.url + 'unused_url')
assert b'Unused url' == response.content
@unittest.skip('Failing intermittently due to inconsistent responses from YT. See TE-871')
def test_video_url(self):
response = requests.get(
self.url + 'test_youtube/OEoXaMPEzfM?v=2&alt=jsonc&callback=callback_func'
)
# YouTube metadata for video `OEoXaMPEzfM` states that duration is 116.
assert b'callback_func({"data": {"duration": 116, "message": "I\'m youtube.", "id": "OEoXaMPEzfM"}})' ==\
response.content
def test_transcript_url_equal(self):
response = requests.get(
self.url + 'test_transcripts_youtube/t__eq_exist'
)
assert ''.join(['<?xml version="1.0" encoding="utf-8" ?>',
'<transcript><text start="1.0" dur="1.0">',
'Equal transcripts</text></transcript>']).encode('utf-8') == response.content
def test_transcript_url_not_equal(self):
response = requests.get(
self.url + 'test_transcripts_youtube/t_neq_exist',
)
assert ''.join(['<?xml version="1.0" encoding="utf-8" ?>',
'<transcript><text start="1.1" dur="5.5">',
'Transcripts sample, different that on server',
'</text></transcript>']).encode('utf-8') == response.content
def test_transcript_not_found(self):
response = requests.get(self.url + 'test_transcripts_youtube/some_id')
assert 404 == response.status_code
def test_reset_configuration(self):
reset_config_url = self.url + 'del_config'
# add some configuration data
self.server.config['test_reset'] = 'This is a reset config test'
# reset server configuration
response = requests.delete(reset_config_url)
assert response.status_code == 200
# ensure that server config dict is empty after successful reset
assert not self.server.config

View File

@@ -1,60 +0,0 @@
"""
Serve HTML5 video sources for acceptance tests
"""
import os
from contextlib import contextmanager
from logging import getLogger
from six.moves.SimpleHTTPServer import SimpleHTTPRequestHandler
from .http import StubHttpService
LOGGER = getLogger(__name__)
class VideoSourceRequestHandler(SimpleHTTPRequestHandler):
"""
Request handler for serving video sources locally.
"""
def translate_path(self, path):
"""
Remove any extra parameters from the path.
For example /gizmo.mp4?1397160769634
becomes /gizmo.mp4
"""
root_dir = self.server.config.get('root_dir')
path = f'{root_dir}{path}'
return path.split('?')[0]
def end_headers(self):
"""
This is required by hls.js to play hls videos.
"""
self.send_header('Access-Control-Allow-Origin', '*')
SimpleHTTPRequestHandler.end_headers(self)
class VideoSourceHttpService(StubHttpService):
"""
Simple HTTP server for serving HTML5 Video sources locally for tests
"""
HANDLER_CLASS = VideoSourceRequestHandler
def __init__(self, port_num=0):
@contextmanager
def _remember_cwd():
"""
Files are automatically served from the current directory
so we need to change it, start the server, then set it back.
"""
curdir = os.getcwd()
try:
yield
finally:
os.chdir(curdir)
with _remember_cwd():
StubHttpService.__init__(self, port_num=port_num)

View File

@@ -1,226 +0,0 @@
"""
Stub implementation of XQueue for acceptance tests.
Configuration values:
"default" (dict): Default response to be sent to LMS as a grade for a submission
"<submission>" (dict): Grade response to return for submissions containing the text <submission>
"register_submission_url" (str): URL to send grader payloads when we receive a submission
If no grade response is configured, a default response will be returned.
"""
import copy
import json
from threading import Timer
from requests import post
from openedx.core.djangolib.markup import HTML
from .http import StubHttpRequestHandler, StubHttpService, require_params
class StubXQueueHandler(StubHttpRequestHandler):
"""
A handler for XQueue POST requests.
"""
DEFAULT_RESPONSE_DELAY = 2
DEFAULT_GRADE_RESPONSE = {'correct': True, 'score': 1, 'msg': ''}
@require_params('POST', 'xqueue_body', 'xqueue_header')
def do_POST(self):
"""
Handle a POST request from the client
Sends back an immediate success/failure response.
It then POSTS back to the client with grading results.
"""
msg = f"XQueue received POST request {self.post_dict} to path {self.path}"
self.log_message(msg)
# Respond only to grading requests
if self._is_grade_request():
# If configured, send the grader payload to other services.
# TODO TNL-3906
# self._register_submission(self.post_dict['xqueue_body'])
try:
xqueue_header = json.loads(self.post_dict['xqueue_header'])
callback_url = xqueue_header['lms_callback_url']
except KeyError:
# If the message doesn't have a header or body,
# then it's malformed. Respond with failure
error_msg = "XQueue received invalid grade request"
self._send_immediate_response(False, message=error_msg)
except ValueError:
# If we could not decode the body or header,
# respond with failure
error_msg = "XQueue could not decode grade request"
self._send_immediate_response(False, message=error_msg)
else:
# Send an immediate response of success
# The grade request is formed correctly
self._send_immediate_response(True)
# Wait a bit before POSTing back to the callback url with the
# grade result configured by the server
# Otherwise, the problem will not realize it's
# queued and it will keep waiting for a response indefinitely
delayed_grade_func = lambda: self._send_grade_response(
callback_url, xqueue_header, self.post_dict['xqueue_body']
)
delay = self.server.config.get('response_delay', self.DEFAULT_RESPONSE_DELAY)
Timer(delay, delayed_grade_func).start()
# If we get a request that's not to the grading submission
# URL, return an error
else:
self._send_immediate_response(False, message="Invalid request URL")
def _send_immediate_response(self, success, message=""):
"""
Send an immediate success/failure message
back to the client
"""
# Send the response indicating success/failure
response_str = json.dumps(
{'return_code': 0 if success else 1, 'content': message}
)
if self._is_grade_request():
self.send_response(
200, content=response_str, headers={'Content-type': 'text/plain'}
)
self.log_message(f"XQueue: sent response {response_str}")
else:
self.send_response(500)
def _send_grade_response(self, postback_url, xqueue_header, xqueue_body_json):
"""
POST the grade response back to the client
using the response provided by the server configuration.
Uses the server configuration to determine what response to send:
1) Specific response for submissions containing matching text in `xqueue_body`
2) Default submission configured by client
3) Default submission
`postback_url` is the URL the client told us to post back to
`xqueue_header` (dict) is the full header the client sent us, which we will send back
to the client so it can authenticate us.
`xqueue_body_json` (json-encoded string) is the body of the submission the client sent us.
"""
# First check if we have a configured response that matches the submission body
grade_response = None
# This matches the pattern against the JSON-encoded xqueue_body
# This is very simplistic, but sufficient to associate a student response
# with a grading response.
# There is a danger here that a submission will match multiple response patterns.
# Rather than fail silently (which could cause unpredictable behavior in tests)
# we abort and log a debugging message.
for pattern, response in self.server.queue_responses:
if pattern in xqueue_body_json:
if grade_response is None:
grade_response = response
# Multiple matches, so abort and log an error
else:
self.log_error(
f"Multiple response patterns matched '{xqueue_body_json}'",
)
return
# Fall back to the default grade response configured for this queue,
# then to the default response.
if grade_response is None:
grade_response = self.server.config.get(
'default', copy.deepcopy(self.DEFAULT_GRADE_RESPONSE)
)
# Wrap the message in <div> tags to ensure that it is valid XML
if isinstance(grade_response, dict) and 'msg' in grade_response:
grade_response['msg'] = HTML("<div>{0}</div>").format(grade_response['msg'])
data = {
'xqueue_header': json.dumps(xqueue_header),
'xqueue_body': json.dumps(grade_response)
}
post(postback_url, data=data)
self.log_message(f"XQueue: sent grading response {data} to {postback_url}")
def _register_submission(self, xqueue_body_json):
"""
If configured, send the submission's grader payload to another service.
"""
url = self.server.config.get('register_submission_url')
# If not configured, do not need to send anything
if url is not None:
try:
xqueue_body = json.loads(xqueue_body_json)
except ValueError:
self.log_error(
f"Could not decode XQueue body as JSON: '{xqueue_body_json}'")
else:
# Retrieve the grader payload, which should be a JSON-encoded dict.
# We pass the payload directly to the service we are notifying, without
# inspecting the contents.
grader_payload = xqueue_body.get('grader_payload')
if grader_payload is not None:
response = post(url, data={'grader_payload': grader_payload})
if not response.ok:
self.log_error(
"Could register submission at URL '{}'. Status was {}".format(
url, response.status_code))
else:
self.log_message(
f"XQueue body is missing 'grader_payload' key: '{xqueue_body}'"
)
def _is_grade_request(self):
"""
Return a boolean indicating whether the requested URL indicates a submission.
"""
return 'xqueue/submit' in self.path
class StubXQueueService(StubHttpService):
"""
A stub XQueue grading server that responds to POST requests to localhost.
"""
HANDLER_CLASS = StubXQueueHandler
NON_QUEUE_CONFIG_KEYS = ['default', 'register_submission_url']
@property
def queue_responses(self):
"""
Returns a list of (pattern, response) tuples, where `pattern` is a pattern
to match in the XQueue body, and `response` is a dictionary to return
as the response from the grader.
Every configuration key is a queue name,
except for 'default' and 'register_submission_url' which have special meaning
"""
return list({
key: value
for key, value in self.config.items()
if key not in self.NON_QUEUE_CONFIG_KEYS
}.items())

View File

@@ -1,172 +0,0 @@
"""
Stub implementation of YouTube for acceptance tests.
To start this stub server on its own from Vagrant:
1.) Locally, modify your Vagrantfile so that it contains:
config.vm.network :forwarded_port, guest: 8031, host: 8031
2.) From within Vagrant dev environment do:
cd common/djangoapps/terrain
python -m stubs.start youtube 8031
3.) Locally, try accessing http://localhost:8031/ and see that
you get "Unused url" message inside the browser.
"""
import json
import time
from collections import OrderedDict
import requests
from six.moves.urllib.parse import urlparse
from .http import StubHttpRequestHandler, StubHttpService
class StubYouTubeHandler(StubHttpRequestHandler):
"""
A handler for Youtube GET requests.
"""
# Default number of seconds to delay the response to simulate network latency.
DEFAULT_DELAY_SEC = 0.5
def do_DELETE(self): # pylint: disable=invalid-name
"""
Allow callers to delete all the server configurations using the /del_config URL.
"""
if self.path in ("/del_config", "/del_config/"):
self.server.config = {}
self.log_message("Reset Server Configuration.")
self.send_response(200)
else:
self.send_response(404)
def do_GET(self):
"""
Handle a GET request from the client and sends response back.
"""
self.log_message(
f"Youtube provider received GET request to path {self.path}"
)
if 'get_config' in self.path:
self.send_json_response(self.server.config)
elif 'test_transcripts_youtube' in self.path:
if 't__eq_exist' in self.path:
status_message = "".join([
'<?xml version="1.0" encoding="utf-8" ?>',
'<transcript><text start="1.0" dur="1.0">',
'Equal transcripts</text></transcript>'
]).encode('utf-8')
self.send_response(
200, content=status_message, headers={'Content-type': 'application/xml'}
)
elif 't_neq_exist' in self.path:
status_message = "".join([
'<?xml version="1.0" encoding="utf-8" ?>',
'<transcript><text start="1.1" dur="5.5">',
'Transcripts sample, different that on server',
'</text></transcript>'
]).encode('utf-8')
self.send_response(
200, content=status_message, headers={'Content-type': 'application/xml'}
)
else:
self.send_response(404)
elif 'test_youtube' in self.path:
params = urlparse(self.path)
youtube_id = params.path.split('/').pop()
if self.server.config.get('youtube_api_private_video'):
self._send_private_video_response(youtube_id, "I'm youtube private video.") # lint-amnesty, pylint: disable=too-many-function-args
else:
self._send_video_response(youtube_id, "I'm youtube.")
elif 'get_youtube_api' in self.path:
# Delay the response to simulate network latency
time.sleep(self.server.config.get('time_to_response', self.DEFAULT_DELAY_SEC))
if self.server.config.get('youtube_api_blocked'):
self.send_response(404, content=b'', headers={'Content-type': 'text/plain'})
else:
# Get the response to send from YouTube.
# We need to do this every time because Google sometimes sends different responses
# as part of their own experiments, which has caused our tests to become "flaky"
self.log_message("Getting iframe api from youtube.com")
iframe_api_response = requests.get('https://www.youtube.com/iframe_api').content.strip(b"\n")
self.send_response(200, content=iframe_api_response, headers={'Content-type': 'text/html'})
else:
self.send_response(
404, content=b"Unused url", headers={'Content-type': 'text/plain'}
)
def _send_video_response(self, youtube_id, message):
"""
Send message back to the client for video player requests.
Requires sending back callback id.
"""
# Delay the response to simulate network latency
time.sleep(self.server.config.get('time_to_response', self.DEFAULT_DELAY_SEC))
# Construct the response content
callback = self.get_params['callback']
data = OrderedDict({
'items': list(
OrderedDict({
'contentDetails': OrderedDict({
'id': youtube_id,
'duration': 'PT2M20S',
})
})
)
})
response = f"{callback}({json.dumps(data)})".encode('utf-8')
self.send_response(200, content=response, headers={'Content-type': 'text/html'})
self.log_message(f"Youtube: sent response {message}")
def _send_private_video_response(self, message):
"""
Send private video error message back to the client for video player requests.
"""
# Construct the response content
callback = self.get_params['callback']
data = OrderedDict({
"error": OrderedDict({
"code": 403,
"errors": [
{
"code": "ServiceForbiddenException",
"domain": "GData",
"internalReason": "Private video"
}
],
"message": message,
})
})
response = f"{callback}({json.dumps(data)})".encode('utf-8')
self.send_response(200, content=response, headers={'Content-type': 'text/html'})
self.log_message(f"Youtube: sent response {message}")
class StubYouTubeService(StubHttpService):
"""
A stub Youtube provider server that responds to GET requests to localhost.
"""
HANDLER_CLASS = StubYouTubeHandler

View File

@@ -26,7 +26,6 @@ ignore_dirs:
# Directories that only contain tests.
- common/test
- test_root
- '*/terrain'
- '*/spec'
- '*/tests'
- '*/djangoapps/*/features'

View File

@@ -35,9 +35,9 @@ mkdir -p "$OUTPUT_DIR"
OUTPUT_FILE="${OUTPUT_DIR}/vulture-report.txt"
echo '' > "$OUTPUT_FILE"
# exclude test code from analysis, as it isn't explicitly called by other
# code. Additionally, application code that is only called by tests
# code. Additionally, application code that is only called by tests
# should be considered dead
EXCLUSIONS='/test,/acceptance,cms/envs,lms/envs,/terrain,migrations/,signals.py'
EXCLUSIONS='/test,/acceptance,cms/envs,lms/envs,migrations/,signals.py'
MIN_CONFIDENCE=90
# paths to the code on which to run the analysis
CODE_PATHS=('cms' 'common' 'lms' 'openedx')