Files
plugins/openedx-tenant-api/test_e2e/api_client.py
DamarKusumo 2b7027e37d Add openedx-tenant-api plugin
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-10 08:20:57 +07:00

289 lines
9.0 KiB
Python

"""API client for tenant operations."""
import logging
import requests
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class TenantAPIClient:
"""Client for interacting with the Tenant API."""
def __init__(self, base_url: str, auth: Dict[str, str]):
"""
Initialize the API client.
Args:
base_url: Base URL for the LMS (e.g., http://local.openedx.io:8000)
auth: Dictionary with 'username' and 'password' for basic auth
"""
self.base_url = base_url.rstrip("/")
self.auth = (auth["username"], auth["password"])
self.session = requests.Session()
self.session.auth = self.auth
self.session.headers.update({
"Content-Type": "application/json",
"Accept": "application/json",
})
def health_check(self) -> Dict[str, Any]:
"""
Check if the tenant API is healthy.
Returns:
Dict with health status information
Raises:
requests.RequestException: If the request fails
"""
url = f"{self.base_url}/api/tenant/v1/health"
logger.info(f"Checking health at {url}")
response = self.session.get(url)
response.raise_for_status()
data = response.json()
logger.info(f"Health check response: {data}")
return data
def create_tenant(
self,
tenant_name: str,
platform_name: Optional[str] = None,
theme_name: str = "indigo",
org_filter: Optional[list] = None,
) -> Dict[str, Any]:
"""
Create a new tenant.
Args:
tenant_name: Name of the tenant (lowercase alphanumeric and hyphens)
platform_name: Display name for the platform
theme_name: Theme name (default: indigo)
org_filter: List of course organizations for this tenant
Returns:
Dict with tenant creation response including tenant_id, lms_url, etc.
Raises:
requests.RequestException: If the request fails
"""
url = f"{self.base_url}/api/tenant/v1/create"
logger.info(f"Creating tenant '{tenant_name}' at {url}")
payload = {
"tenant_name": tenant_name,
"theme_name": theme_name,
}
if platform_name:
payload["platform_name"] = platform_name
if org_filter:
payload["org_filter"] = org_filter
response = self.session.post(url, json=payload)
response.raise_for_status()
data = response.json()
logger.info(f"Tenant created: {data.get('tenant_name')} (ID: {data.get('tenant_id')})")
return data
def create_admin(
self,
tenant_name: str,
username: str,
email: str,
password: str,
org_name: str,
) -> Dict[str, Any]:
"""
Create a new admin user for a tenant.
Args:
tenant_name: Name of the tenant
username: Username for the admin
email: Email address for the admin
password: Password for the admin
org_name: Organization name for role assignment
Returns:
Dict with admin creation response including user details
Raises:
requests.RequestException: If the request fails
"""
url = f"{self.base_url}/api/tenant/v1/admin/create"
logger.info(f"Creating admin '{username}' for tenant '{tenant_name}' at {url}")
payload = {
"tenant_name": tenant_name,
"username": username,
"email": email,
"password": password,
"org_name": org_name,
}
response = self.session.post(url, json=payload)
response.raise_for_status()
data = response.json()
logger.info(f"Admin created: {data.get('user', {}).get('username')} with roles {data.get('user', {}).get('roles')}")
return data
def list_tenants(self) -> Dict[str, Any]:
"""
List all tenants.
Returns:
Dict with list of tenants
Raises:
requests.RequestException: If the request fails
"""
url = f"{self.base_url}/api/tenant/v1/list"
logger.info(f"Listing tenants at {url}")
response = self.session.get(url)
response.raise_for_status()
data = response.json()
logger.info(f"Found {data.get('count', 0)} tenants")
return data
def delete_tenant(self, tenant_name: str) -> Dict[str, Any]:
"""
Delete a tenant.
Args:
tenant_name: Name of the tenant to delete
Returns:
Dict with deletion response
Raises:
requests.RequestException: If the request fails
"""
url = f"{self.base_url}/api/tenant/v1/delete/{tenant_name}"
logger.info(f"Deleting tenant '{tenant_name}' at {url}")
response = self.session.delete(url)
response.raise_for_status()
data = response.json()
logger.info(f"Tenant deleted: {tenant_name}")
return data
def delete_user(self, username: str) -> Dict[str, Any]:
"""
Delete a user by username via the LMS Django shell.
Args:
username: Username to delete
Returns:
Dict with deletion status
Raises:
requests.RequestException: If the request fails
"""
url = f"{self.base_url}/api/tenant/v1/delete/{username}"
logger.info(f"Deleting user '{username}' at {url}")
response = self.session.delete(url)
response.raise_for_status()
data = response.json()
logger.info(f"User deleted: {username}")
return data
def delete_admin(self, username: str) -> Dict[str, Any]:
"""
Delete an admin user directly via LMS Django shell (bypasses FK constraints).
Uses direct SQL with SET FOREIGN_KEY_CHECKS=0.
Args:
username: Username to delete
Returns:
Dict with deletion status
Raises:
requests.RequestException: If the request fails
"""
import subprocess
from pathlib import Path
base_url = self.base_url.rstrip("/")
# Resolve tutor executable
import os
project_root = Path(__file__).resolve().parent.parent.parent
venv_tutor = project_root / ".venv" / "Scripts" / "tutor.exe"
if venv_tutor.exists():
tutor_cmd = [str(venv_tutor), "dev"]
else:
tutor_cmd = ["tutor", "dev"]
escaped_user = username.replace("'", "''")
cmd = tutor_cmd + [
"exec", "-T", "lms",
"python", "manage.py", "lms", "shell", "-c",
(
"from django.db import connection; "
"cursor = connection.cursor(); "
"cursor.execute('SET FOREIGN_KEY_CHECKS = 0'); "
"cursor.execute('DELETE FROM auth_user WHERE username = %s', ['" + escaped_user + "']); "
"cursor.execute('SET FOREIGN_KEY_CHECKS = 1'); "
"print('DELETED_" + escaped_user + "')"
),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
output = result.stdout + result.stderr
marker = f"DELETED_{username}"
if result.returncode == 0 and marker in output:
logger.info(f"Admin deleted via Django shell: {username}")
return {"status": "success", "deleted": username}
else:
logger.warning(f"Could not delete admin '{username}': {result.stderr.strip()[:200]}")
return {"status": "warning", "message": f"Admin '{username}' may not exist", "stderr": result.stderr[:200]}
def get_user(self, username: str) -> Optional[Dict[str, Any]]:
"""
Check if a user exists via LMS Django shell.
Args:
username: Username to check
Returns:
Dict with user info if exists, None if not found
"""
import subprocess
from pathlib import Path
project_root = Path(__file__).resolve().parent.parent.parent
venv_tutor = project_root / ".venv" / "Scripts" / "tutor.exe"
if venv_tutor.exists():
tutor_cmd = [str(venv_tutor), "dev"]
else:
tutor_cmd = ["tutor", "dev"]
escaped_user = username.replace("'", "''")
cmd = tutor_cmd + [
"exec", "-T", "lms",
"python", "manage.py", "lms", "shell", "-c",
(
"from django.contrib.auth import get_user_model; "
"User = get_user_model(); "
"u = User.objects.filter(username='" + escaped_user + "').first(); "
"if u: print('EXISTS_' + u.username); "
"else: print('NOT_FOUND_" + escaped_user + "')"
),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
output = result.stdout + result.stderr
if f"EXISTS_{username}" in output:
return {"username": username, "exists": True}
return None