Files
edx-platform/xmodule/capa/xqueue_interface.py
2026-02-19 11:02:00 +05:00

186 lines
6.5 KiB
Python

"""
LMS Interface to external queueing system (xqueue)
"""
import hashlib
import json
import logging
from typing import TYPE_CHECKING, Dict, Optional
import requests
from requests.auth import HTTPBasicAuth
from xmodule.capa.xqueue_submission import XQueueInterfaceSubmission
if TYPE_CHECKING:
from xmodule.capa_block import ProblemBlock
log = logging.getLogger(__name__)
DATEFORMAT = "%Y%m%d%H%M%S"
XQUEUE_METRIC_NAME = "edxapp.xqueue"
# Wait time for response from Xqueue.
XQUEUE_TIMEOUT = 35 # seconds
CONNECT_TIMEOUT = 3.05 # seconds
READ_TIMEOUT = 10 # seconds
def make_hashkey(seed):
"""
Generate a string key by hashing
"""
h = hashlib.md5()
h.update(str(seed).encode("latin-1"))
return h.hexdigest()
def make_xheader(lms_callback_url, lms_key, queue_name):
"""
Generate header for delivery and reply of queue request.
Xqueue header is a JSON-serialized dict:
{ 'lms_callback_url': url to which xqueue will return the request (string),
'lms_key': secret key used by LMS to protect its state (string),
'queue_name': designate a specific queue within xqueue server, e.g. 'MITx-6.00x' (string)
}
"""
return json.dumps({"lms_callback_url": lms_callback_url, "lms_key": lms_key, "queue_name": queue_name})
def parse_xreply(xreply):
"""
Parse the reply from xqueue. Messages are JSON-serialized dict:
{ 'return_code': 0 (success), 1 (fail)
'content': Message from xqueue (string)
}
"""
try:
xreply = json.loads(xreply)
except ValueError as err:
log.error(err)
return (1, "unexpected reply from server")
return_code = xreply["return_code"]
content = xreply["content"]
return (return_code, content)
class XQueueInterface: # pylint: disable=too-few-public-methods
"""Initializes the XQueue interface."""
def __init__(
self,
url: str,
django_auth: Dict[str, str],
requests_auth: Optional[HTTPBasicAuth] = None,
block: "ProblemBlock" = None,
use_submission_service: bool = False,
):
"""
Initializes the XQueue interface.
Args:
url (str): The URL of the XQueue service.
django_auth (Dict[str, str]): Authentication credentials for Django.
requests_auth (Optional[HTTPBasicAuth], optional): Authentication for HTTP requests. Defaults to None.
block ('ProblemBlock', optional): Added as a parameter only to extract the course_id
to check the course waffle flag `send_to_submission_course.enable`.
This can be removed after the legacy xqueue is deprecated. Defaults to None.
use_submission_service (bool): If True, use the edx-submissions service instead of XQueue.
"""
self.url = url
self.auth = django_auth
self.session = requests.Session()
self.session.auth = requests_auth
self.block = block
self.submission = XQueueInterfaceSubmission(self.block)
self.use_submission_service = use_submission_service
def send_to_queue(self, header, body, files_to_upload=None):
"""
Submit a request to xqueue.
header: JSON-serialized dict in the format described in 'xqueue_interface.make_xheader'
body: Serialized data for the receipient behind the queueing service. The operation of
xqueue is agnostic to the contents of 'body'
files_to_upload: List of file objects to be uploaded to xqueue along with queue request
Returns (error_code, msg) where error_code != 0 indicates an error
"""
# log the send to xqueue
header_info = json.loads(header)
queue_name = header_info.get("queue_name", "") # pylint: disable=unused-variable
# Attempt to send to queue
(error, msg) = self._send_to_queue(header, body, files_to_upload)
# Log in, then try again
if error and (msg == "login_required"):
(error, content) = self._login()
if error != 0:
# when the login fails
log.debug("Failed to login to queue: %s", content)
return (error, content)
if files_to_upload is not None:
# Need to rewind file pointers
for f in files_to_upload:
f.seek(0)
(error, msg) = self._send_to_queue(header, body, files_to_upload)
return error, msg
def _login(self):
payload = {"username": self.auth["username"], "password": self.auth["password"]}
return self._http_post(self.url + "/xqueue/login/", payload)
def _send_to_queue(self, header, body, files_to_upload):
"""Send the problem submission to XQueue, handling legacy fallback and edX submission logic."""
payload = {"xqueue_header": header, "xqueue_body": body}
files = {}
if files_to_upload is not None:
for f in files_to_upload:
files.update({f.name: f})
if self.block is None:
# XQueueInterface: if self.block is None, falling back to legacy xqueue submission.
log.error(
"Unexpected None block: falling back to legacy xqueue submission. "
"This may indicate a problem with the xqueue transition."
)
return self._http_post(self.url + "/xqueue/submit/", payload, files=files)
header_info = json.loads(header)
queue_key = header_info["lms_key"] # pylint: disable=unused-variable
if self.use_submission_service:
submission = self.submission.send_to_submission( # pylint: disable=unused-variable
header, body, queue_key, files
)
return None, ""
return self._http_post(self.url + "/xqueue/submit/", payload, files=files)
def _http_post(self, url, data, files=None):
"""Send an HTTP POST request and handle connection errors, timeouts, and unexpected status codes."""
try:
response = self.session.post(url, data=data, files=files, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT))
except requests.exceptions.ConnectionError as err:
log.error(err)
return 1, "cannot connect to server"
except requests.exceptions.ReadTimeout as err:
log.error(err)
return 1, "failed to read from the server"
if response.status_code not in [200]:
return 1, f"unexpected HTTP status code [{response.status_code}]"
return parse_xreply(response.text)