Switch testing from ecs to ec2 (#20846)
This commit is contained in:
committed by
GitHub
parent
c5b86eb5c7
commit
139cdb8ba2
@@ -49,4 +49,4 @@ output = reports/coverage.xml
|
||||
jenkins_source =
|
||||
/home/jenkins/workspace/$JOB_NAME
|
||||
/home/jenkins/workspace/$SUBSET_JOB
|
||||
/edx/app/edxapp/edx-platform
|
||||
/home/jenkins/edx-platform
|
||||
|
||||
@@ -127,6 +127,12 @@ MOCK_PEER_GRADING = True
|
||||
|
||||
COMMENTS_SERVICE_URL = 'http://localhost:4567'
|
||||
|
||||
DJFS = {
|
||||
'type': 'osfs',
|
||||
'directory_root': '{}/django-pyfs/static/django-pyfs'.format(DATA_DIR),
|
||||
'url_root': '/static/django-pyfs',
|
||||
}
|
||||
|
||||
############################ STATIC FILES #############################
|
||||
|
||||
# TODO (cpennington): We need to figure out how envs/test.py can inject things
|
||||
|
||||
@@ -57,9 +57,9 @@ class TestPaverPytestCmd(unittest.TestCase):
|
||||
|
||||
env_var_cmd = "{} DISABLE_COURSEENROLLMENT_HISTORY=1".format(django_env_var_cmd)
|
||||
|
||||
xdist_string = u'--tx {}*ssh="ubuntu@{} -o StrictHostKeyChecking=no"' \
|
||||
'//python="source /edx/app/edxapp/edxapp_env; {}; python"' \
|
||||
'//chdir="/edx/app/edxapp/edx-platform"' \
|
||||
xdist_string = u'--tx {}*ssh="jenkins@{} -o StrictHostKeyChecking=no"' \
|
||||
'//python="source edx-venv/bin/activate; {}; python"' \
|
||||
'//chdir="edx-platform"' \
|
||||
.format(processes, ip, env_var_cmd)
|
||||
expected_statement.append(xdist_string)
|
||||
for rsync_dir in Env.rsync_dirs():
|
||||
|
||||
@@ -346,8 +346,8 @@ class TestPaverRunQuality(PaverTestCase):
|
||||
def test_no_diff_quality_failures(self):
|
||||
# Assert nothing is raised
|
||||
pavelib.quality.run_quality("")
|
||||
# And assert that sh was called 7 times:
|
||||
# 5 for pylint on each of the system directories
|
||||
# And assert that sh was called 8 times:
|
||||
# 6 for pylint on each of the system directories
|
||||
# 1 for diff_quality for pylint
|
||||
# 1 for diff_quality for eslint
|
||||
self.assertEqual(self._mock_paver_sh.call_count, 7)
|
||||
self.assertEqual(self._mock_paver_sh.call_count, 8)
|
||||
|
||||
@@ -20,7 +20,7 @@ from openedx.core.djangolib.markup import HTML
|
||||
from .utils.envs import Env
|
||||
from .utils.timer import timed
|
||||
|
||||
ALL_SYSTEMS = 'lms,cms,common,openedx,pavelib'
|
||||
ALL_SYSTEMS = 'lms,cms,common,openedx,pavelib,scripts'
|
||||
JUNIT_XML_TEMPLATE = u"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<testsuite name="{name}" tests="1" errors="0" failures="{failure_count}" skip="0">
|
||||
<testcase classname="pavelib.quality" name="{name}" time="{seconds}">{failure_element}</testcase>
|
||||
|
||||
@@ -170,9 +170,9 @@ class SystemTestSuite(PytestSuite):
|
||||
env_var_cmd = u'export DJANGO_SETTINGS_MODULE={} DISABLE_COURSEENROLLMENT_HISTORY={}'\
|
||||
.format('{}.envs.{}'.format(self.root, self.settings),
|
||||
self.disable_courseenrollment_history)
|
||||
xdist_string = u'--tx {}*ssh="ubuntu@{} -o StrictHostKeyChecking=no"' \
|
||||
'//python="source /edx/app/edxapp/edxapp_env; {}; python"' \
|
||||
'//chdir="/edx/app/edxapp/edx-platform"' \
|
||||
xdist_string = u'--tx {}*ssh="jenkins@{} -o StrictHostKeyChecking=no"' \
|
||||
'//python="source edx-venv/bin/activate; {}; python"' \
|
||||
'//chdir="edx-platform"' \
|
||||
.format(xdist_remote_processes, ip, env_var_cmd)
|
||||
cmd.append(xdist_string)
|
||||
for rsync_dir in Env.rsync_dirs():
|
||||
@@ -295,9 +295,9 @@ class LibTestSuite(PytestSuite):
|
||||
env_var_cmd = u'{} DISABLE_COURSEENROLLMENT_HISTORY={}' \
|
||||
.format(django_env_var_cmd, self.disable_courseenrollment_history)
|
||||
|
||||
xdist_string = u'--tx {}*ssh="ubuntu@{} -o StrictHostKeyChecking=no"' \
|
||||
'//python="source /edx/app/edxapp/edxapp_env; {}; python"' \
|
||||
'//chdir="/edx/app/edxapp/edx-platform"' \
|
||||
xdist_string = u'--tx {}*ssh="jenkins@{} -o StrictHostKeyChecking=no"' \
|
||||
'//python="source edx-venv/bin/activate; {}; python"' \
|
||||
'//chdir="edx-platform"' \
|
||||
.format(xdist_remote_processes, ip, env_var_cmd)
|
||||
cmd.append(xdist_string)
|
||||
for rsync_dir in Env.rsync_dirs():
|
||||
|
||||
@@ -1,12 +1,7 @@
|
||||
def runPythonTests() {
|
||||
// Determine git refspec, branch, and clone type
|
||||
if (env.ghprbActualCommit) {
|
||||
git_branch = "${ghprbActualCommit}"
|
||||
git_refspec = "+refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*"
|
||||
} else {
|
||||
git_branch = "${BRANCH_NAME}"
|
||||
git_refspec = "+refs/heads/${BRANCH_NAME}:refs/remotes/origin/${BRANCH_NAME}"
|
||||
}
|
||||
git_branch = xdist_git_branch()
|
||||
git_refspec = xdist_git_refspec()
|
||||
sshagent(credentials: ['jenkins-worker', 'jenkins-worker-pem'], ignoreMissing: true) {
|
||||
checkout changelog: false, poll: false, scm: [$class: 'GitSCM', branches: [[name: git_branch]],
|
||||
doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'CloneOption', honorRefspec: true,
|
||||
@@ -32,6 +27,14 @@ def xdist_git_branch() {
|
||||
}
|
||||
}
|
||||
|
||||
def xdist_git_refspec() {
|
||||
if (env.ghprbActualCommit) {
|
||||
return "+refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*"
|
||||
} else {
|
||||
return "+refs/heads/${BRANCH_NAME}:refs/remotes/origin/${BRANCH_NAME}"
|
||||
}
|
||||
}
|
||||
|
||||
pipeline {
|
||||
agent { label "jenkins-worker" }
|
||||
options {
|
||||
@@ -39,10 +42,14 @@ pipeline {
|
||||
timeout(60)
|
||||
}
|
||||
environment {
|
||||
XDIST_CONTAINER_SUBNET = credentials('XDIST_CONTAINER_SUBNET')
|
||||
XDIST_CONTAINER_SECURITY_GROUP = credentials('XDIST_CONTAINER_SECURITY_GROUP')
|
||||
XDIST_CONTAINER_TASK_NAME = "jenkins-worker-task"
|
||||
XDIST_GIT_BRANCH = xdist_git_branch()
|
||||
XDIST_GIT_REFSPEC = xdist_git_refspec()
|
||||
XDIST_INSTANCE_TYPE = "c5d.large"
|
||||
XDIST_WORKER_AMI = credentials('XDIST_WORKER_AMI')
|
||||
XDIST_WORKER_IAM_PROFILE_ARN = credentials('XDIST_WORKER_IAM_PROFILE_ARN')
|
||||
XDIST_WORKER_KEY_NAME = "jenkins-worker"
|
||||
XDIST_WORKER_SUBNET = credentials('XDIST_WORKER_SUBNET')
|
||||
XDIST_WORKER_SECURITY_GROUP = credentials('XDIST_WORKER_SECURITY_GROUP')
|
||||
}
|
||||
stages {
|
||||
stage('Mark build as pending on Github') {
|
||||
@@ -74,7 +81,7 @@ pipeline {
|
||||
agent { label "jenkins-worker" }
|
||||
environment {
|
||||
TEST_SUITE = "lms-unit"
|
||||
XDIST_NUM_TASKS = 10
|
||||
XDIST_NUM_WORKERS = 10
|
||||
XDIST_REMOTE_NUM_PROCESSES = 1
|
||||
}
|
||||
steps {
|
||||
@@ -94,7 +101,7 @@ pipeline {
|
||||
agent { label "jenkins-worker" }
|
||||
environment {
|
||||
TEST_SUITE = "cms-unit"
|
||||
XDIST_NUM_TASKS = 3
|
||||
XDIST_NUM_WORKERS = 3
|
||||
XDIST_REMOTE_NUM_PROCESSES = 1
|
||||
}
|
||||
steps {
|
||||
@@ -114,7 +121,7 @@ pipeline {
|
||||
agent { label "jenkins-worker" }
|
||||
environment {
|
||||
TEST_SUITE = "commonlib-unit"
|
||||
XDIST_NUM_TASKS = 3
|
||||
XDIST_NUM_WORKERS = 3
|
||||
XDIST_REMOTE_NUM_PROCESSES = 1
|
||||
}
|
||||
steps {
|
||||
|
||||
@@ -36,9 +36,9 @@ if [[ -n "$TOXENV" ]]; then
|
||||
export NO_PREREQ_INSTALL="True"
|
||||
fi
|
||||
|
||||
if [[ -n "$XDIST_NUM_TASKS" ]]; then
|
||||
if [[ -n "$XDIST_NUM_WORKERS" ]]; then
|
||||
bash scripts/xdist/prepare_xdist_nodes.sh
|
||||
PAVER_ARGS="-v --xdist_ip_addresses="$(<pytest_task_ips.txt)""
|
||||
PAVER_ARGS="-v --xdist_ip_addresses="$(<pytest_worker_ips.txt)""
|
||||
export SHARD="all"
|
||||
if [[ -n "$XDIST_REMOTE_NUM_PROCESSES" ]]; then
|
||||
PARALLEL="--processes=$XDIST_REMOTE_NUM_PROCESSES"
|
||||
|
||||
@@ -1,18 +1,14 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
echo "Spinning up xdist containers with pytest_container_manager.py"
|
||||
python scripts/xdist/pytest_container_manager.py -a up -n ${XDIST_NUM_TASKS} \
|
||||
-t ${XDIST_CONTAINER_TASK_NAME} \
|
||||
-s ${XDIST_CONTAINER_SUBNET} \
|
||||
-sg ${XDIST_CONTAINER_SECURITY_GROUP}
|
||||
|
||||
# Need to map remote branch to local branch when fetching a branch other than master
|
||||
if [ "$XDIST_GIT_BRANCH" == "master" ]; then
|
||||
XDIST_GIT_FETCH_STRING="$XDIST_GIT_BRANCH"
|
||||
else
|
||||
XDIST_GIT_FETCH_STRING="$XDIST_GIT_BRANCH:$XDIST_GIT_BRANCH"
|
||||
fi
|
||||
echo "Spinning up xdist workers with pytest_worker_manager.py"
|
||||
python scripts/xdist/pytest_worker_manager.py -a up -n ${XDIST_NUM_WORKERS} \
|
||||
-ami ${XDIST_WORKER_AMI} \
|
||||
-type ${XDIST_INSTANCE_TYPE} \
|
||||
-s ${XDIST_WORKER_SUBNET} \
|
||||
-sg ${XDIST_WORKER_SECURITY_GROUP} \
|
||||
-key ${XDIST_WORKER_KEY_NAME} \
|
||||
-iam ${XDIST_WORKER_IAM_PROFILE_ARN}
|
||||
|
||||
# Install the correct version of Django depending on which tox environment (if any) is in use
|
||||
if [[ -z ${TOX_ENV+x} ]] || [[ ${TOX_ENV} == 'null' ]]; then
|
||||
@@ -21,16 +17,16 @@ else
|
||||
DJANGO_REQUIREMENT=$(pip freeze | grep "^[Dd]jango==")
|
||||
fi
|
||||
|
||||
ip_list=$(<pytest_task_ips.txt)
|
||||
ip_list=$(<pytest_worker_ips.txt)
|
||||
for ip in $(echo $ip_list | sed "s/,/ /g")
|
||||
do
|
||||
container_reqs_cmd="ssh -o StrictHostKeyChecking=no ubuntu@$ip 'cd /edx/app/edxapp;
|
||||
git clone --branch master --depth 1 --no-tags -q https://github.com/edx/edx-platform.git; cd edx-platform;
|
||||
git fetch --depth=1 --no-tags -q origin ${XDIST_GIT_FETCH_STRING}; git checkout -q ${XDIST_GIT_BRANCH};
|
||||
source /edx/app/edxapp/edxapp_env;
|
||||
worker_reqs_cmd="ssh -o StrictHostKeyChecking=no jenkins@$ip
|
||||
'git clone --branch master --depth 1 -q https://github.com/edx/edx-platform.git; cd edx-platform;
|
||||
git fetch -fq origin ${XDIST_GIT_REFSPEC}; git checkout -q ${XDIST_GIT_BRANCH};
|
||||
source ../edx-venv/bin/activate;
|
||||
pip install -q ${DJANGO_REQUIREMENT} -r requirements/edx/testing.txt; mkdir reports' & "
|
||||
|
||||
cmd=$cmd$container_reqs_cmd
|
||||
cmd=$cmd$worker_reqs_cmd
|
||||
done
|
||||
cmd=$cmd"wait"
|
||||
|
||||
|
||||
@@ -1,203 +0,0 @@
|
||||
import argparse
|
||||
import logging
|
||||
import time
|
||||
|
||||
import boto3
|
||||
from botocore.config import Config
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PytestContainerManager():
|
||||
"""
|
||||
Responsible for spinning up and terminating ECS tasks to be used with pytest-xdist
|
||||
"""
|
||||
TASK_RUN_TIMEOUT_MINUTES = 10
|
||||
MAX_RUN_TASK_RETRIES = 7
|
||||
|
||||
def __init__(self, region, cluster):
|
||||
config = Config(
|
||||
retries={
|
||||
'max_attempts': self.MAX_RUN_TASK_RETRIES
|
||||
}
|
||||
)
|
||||
self.ecs = boto3.client('ecs', region, config=config)
|
||||
self.cluster_name = cluster
|
||||
|
||||
def spin_up_tasks(self, number_of_tasks, task_name, subnets, security_groups, public_ip, launch_type):
|
||||
"""
|
||||
Spins up tasks and generates two .txt files, containing the IP/ arns
|
||||
of the new tasks.
|
||||
"""
|
||||
revision = self.ecs.describe_task_definition(taskDefinition=task_name)['taskDefinition']['revision']
|
||||
task_definition = "{}:{}".format(task_name, revision)
|
||||
|
||||
logging.info("Spinning up {} tasks based on task definition: {}".format(number_of_tasks, task_definition))
|
||||
|
||||
remainder = number_of_tasks % 10
|
||||
quotient = number_of_tasks / 10
|
||||
|
||||
task_num_list = [10 for i in range(0, quotient)]
|
||||
if remainder:
|
||||
task_num_list.append(remainder)
|
||||
|
||||
# Spin up tasks. boto3's run_task only allows 10 tasks to be launched at a time
|
||||
task_arns = []
|
||||
for num in task_num_list:
|
||||
for retry in range(1, self.MAX_RUN_TASK_RETRIES + 1):
|
||||
try:
|
||||
response = self.ecs.run_task(
|
||||
count=num,
|
||||
cluster=self.cluster_name,
|
||||
launchType=launch_type,
|
||||
networkConfiguration={
|
||||
'awsvpcConfiguration': {
|
||||
'subnets': subnets,
|
||||
'securityGroups': security_groups,
|
||||
'assignPublicIp': public_ip
|
||||
}
|
||||
},
|
||||
taskDefinition=task_definition
|
||||
)
|
||||
except ClientError as err:
|
||||
# Handle AWS throttling with an exponential backoff
|
||||
if retry == self.MAX_RUN_TASK_RETRIES:
|
||||
raise StandardError(
|
||||
"MAX_RUN_TASK_RETRIES ({}) reached while spinning up tasks due to AWS throttling.".format(self.MAX_RUN_TASK_RETRIES)
|
||||
)
|
||||
logger.info("Hit error: {}. Retrying".format(err))
|
||||
countdown = 2 ** retry
|
||||
logger.info("Sleeping for {} seconds".format(countdown))
|
||||
time.sleep(countdown)
|
||||
else:
|
||||
break
|
||||
|
||||
for task_response in response['tasks']:
|
||||
task_arns.append(task_response['taskArn'])
|
||||
|
||||
failure_array = response['failures']
|
||||
if failure_array:
|
||||
raise StandardError(
|
||||
"There was at least one failure when spinning up tasks: {}".format(failure_array)
|
||||
)
|
||||
|
||||
# Wait for tasks to finish spinning up
|
||||
not_running = task_arns[:]
|
||||
ip_addresses = []
|
||||
all_running = False
|
||||
for attempt in range(0, self.TASK_RUN_TIMEOUT_MINUTES * 2):
|
||||
time.sleep(30)
|
||||
list_tasks_response = self.ecs.describe_tasks(cluster=self.cluster_name, tasks=not_running)['tasks']
|
||||
del not_running[:]
|
||||
for task_response in list_tasks_response:
|
||||
if task_response['lastStatus'] == 'RUNNING':
|
||||
for container in task_response['containers']:
|
||||
container_ip_address = container["networkInterfaces"][0]["privateIpv4Address"]
|
||||
if container_ip_address not in ip_addresses:
|
||||
ip_addresses.append(container_ip_address)
|
||||
else:
|
||||
not_running.append(task_response['taskArn'])
|
||||
|
||||
if not_running:
|
||||
logger.info("Still waiting on {} tasks to spin up".format(len(not_running)))
|
||||
else:
|
||||
logger.info("Finished spinning up tasks")
|
||||
all_running = True
|
||||
break
|
||||
|
||||
if not all_running:
|
||||
raise StandardError(
|
||||
"Timed out waiting to spin up all tasks."
|
||||
)
|
||||
|
||||
logger.info("Successfully booted up {} tasks.".format(number_of_tasks))
|
||||
|
||||
# Generate .txt files containing IP addresses and task arns
|
||||
ip_list_string = ",".join(ip_addresses)
|
||||
logger.info("Task IP list: {}".format(ip_list_string))
|
||||
ip_list_file = open("pytest_task_ips.txt", "w")
|
||||
ip_list_file.write(ip_list_string)
|
||||
ip_list_file.close()
|
||||
|
||||
task_arn_list_string = ",".join(task_arns)
|
||||
logger.info("Task arn list: {}".format(task_arn_list_string))
|
||||
task_arn_file = open("pytest_task_arns.txt", "w")
|
||||
task_arn_file.write(task_arn_list_string)
|
||||
task_arn_file.close()
|
||||
|
||||
def terminate_tasks(self, task_arns, reason):
|
||||
"""
|
||||
Terminates tasks based on a list of task_arns.
|
||||
"""
|
||||
for task_arn in task_arns.split(','):
|
||||
response = self.ecs.stop_task(
|
||||
cluster=self.cluster_name,
|
||||
task=task_arn,
|
||||
reason=reason
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="PytestContainerManager, manages ECS tasks in an AWS cluster."
|
||||
)
|
||||
|
||||
parser.add_argument('--action', '-a', choices=['up', 'down'], default=None,
|
||||
help="Action for PytestContainerManager to perform. "
|
||||
"Either up for spinning up AWS ECS tasks or down for stopping them")
|
||||
|
||||
parser.add_argument('--cluster', '-c', default="jenkins-worker-containers",
|
||||
help="AWS Cluster name where the tasks run. Defaults to"
|
||||
"the testeng cluster: jenkins-worker-containers")
|
||||
|
||||
parser.add_argument('--region', '-g', default='us-east-1',
|
||||
help="AWS region where ECS infrastructure lives. Defaults to us-east-1")
|
||||
|
||||
# Spinning up tasks
|
||||
parser.add_argument('--launch_type', default='FARGATE', choices=['EC2', 'FARGATE'],
|
||||
help="ECS launch type for tasks. Defaults to FARGATE")
|
||||
|
||||
parser.add_argument('--num_tasks', '-n', type=int, default=None,
|
||||
help="Number of ECS tasks to spin up")
|
||||
|
||||
parser.add_argument('--public_ip', choices=['ENABLED', 'DISABLED'],
|
||||
default='DISABLED', help="Whether the tasks should have a public IP")
|
||||
|
||||
parser.add_argument('--subnets', '-s', nargs='+', default=None,
|
||||
help="List of subnets for the tasks to exist in")
|
||||
|
||||
parser.add_argument('--security_groups', '-sg', nargs='+', default=None,
|
||||
help="List of security groups to apply to the tasks")
|
||||
|
||||
parser.add_argument('--task_name', '-t', default=None,
|
||||
help="Name of the task definition")
|
||||
|
||||
# Terminating tasks
|
||||
parser.add_argument('--reason', '-r', default="Finished executing tests",
|
||||
help="Reason for terminating tasks")
|
||||
|
||||
parser.add_argument('--task_arns', '-arns', default=None,
|
||||
help="Task arns to terminate")
|
||||
|
||||
args = parser.parse_args()
|
||||
containerManager = PytestContainerManager(args.region, args.cluster)
|
||||
|
||||
if args.action == 'up':
|
||||
containerManager.spin_up_tasks(
|
||||
args.num_tasks,
|
||||
args.task_name,
|
||||
args.subnets,
|
||||
args.security_groups,
|
||||
args.public_ip,
|
||||
args.launch_type
|
||||
)
|
||||
elif args.action == 'down':
|
||||
containerManager.terminate_tasks(
|
||||
args.task_arns,
|
||||
args.reason
|
||||
)
|
||||
else:
|
||||
logger.info("No action specified for PytestContainerManager")
|
||||
212
scripts/xdist/pytest_worker_manager.py
Normal file
212
scripts/xdist/pytest_worker_manager.py
Normal file
@@ -0,0 +1,212 @@
|
||||
"""
|
||||
Manages the creation and termination of EC2 workers, to be used with pytest-xdist
|
||||
as part of the CI process on Jenkins.
|
||||
"""
|
||||
import argparse
|
||||
import logging
|
||||
import time
|
||||
|
||||
import boto3
|
||||
from botocore.config import Config
|
||||
from botocore.exceptions import ClientError
|
||||
import socket
|
||||
from multiprocessing import Pool
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _check_worker_ready(ip):
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
response = sock.connect_ex((ip, 22))
|
||||
sock.close()
|
||||
return response
|
||||
|
||||
|
||||
class PytestWorkerManager():
|
||||
"""
|
||||
Responsible for spinning up and terminating EC2 workers to be used with pytest-xdist
|
||||
"""
|
||||
WORKER_BOOTUP_TIMEOUT_MINUTES = 5
|
||||
WORKER_SSH_ATTEMPTS = 10
|
||||
MAX_RUN_WORKER_RETRIES = 7
|
||||
|
||||
def __init__(self, region):
|
||||
self.ec2 = boto3.client('ec2', region)
|
||||
|
||||
def spin_up_workers(self, number_of_workers, ami, instance_type, subnet, security_group_ids, key_name, iam_instance_profile):
|
||||
"""
|
||||
Spins up workers and generates two .txt files, containing the IP/ arns
|
||||
of the new workers.
|
||||
"""
|
||||
logging.info("Spinning up {} workers".format(number_of_workers))
|
||||
|
||||
worker_instance_ids = []
|
||||
for retry in range(1, self.MAX_RUN_WORKER_RETRIES + 1):
|
||||
try:
|
||||
response = self.ec2.run_instances(
|
||||
MinCount=number_of_workers,
|
||||
MaxCount=number_of_workers,
|
||||
ImageId=ami,
|
||||
InstanceType=instance_type,
|
||||
SubnetId=subnet,
|
||||
SecurityGroupIds=security_group_ids,
|
||||
KeyName=key_name,
|
||||
TagSpecifications=[
|
||||
{
|
||||
'ResourceType': 'instance',
|
||||
'Tags': [
|
||||
{"Key": "master", "Value": "build.testeng.edx.org"},
|
||||
{"Key": "worker", "Value": "pytest_xdist_worker"}
|
||||
]
|
||||
}
|
||||
]
|
||||
)
|
||||
except ClientError as err:
|
||||
# Handle AWS throttling with an exponential backoff
|
||||
if retry == self.MAX_RUN_WORKER_RETRIES:
|
||||
raise StandardError(
|
||||
"MAX_RUN_WORKER_RETRIES ({}) reached while spinning up workers due to AWS throttling.".format(self.MAX_RUN_WORKER_RETRIES)
|
||||
)
|
||||
logger.info("Hit error: {}. Retrying".format(err))
|
||||
countdown = 2 ** retry
|
||||
logger.info("Sleeping for {} seconds".format(countdown))
|
||||
time.sleep(countdown)
|
||||
else:
|
||||
break
|
||||
|
||||
for instance_response in response['Instances']:
|
||||
worker_instance_ids.append(instance_response['InstanceId'])
|
||||
|
||||
# Wait for workers to finish spinning up
|
||||
not_running = worker_instance_ids[:]
|
||||
ip_addresses = []
|
||||
all_running = False
|
||||
for attempt in range(0, self.WORKER_BOOTUP_TIMEOUT_MINUTES * 12):
|
||||
list_workers_response = self.ec2.describe_instances(InstanceIds=not_running)
|
||||
del not_running[:]
|
||||
for reservations in list_workers_response['Reservations']:
|
||||
for instance_info in reservations['Instances']:
|
||||
if instance_info['State']['Name'] == "running":
|
||||
ip_addresses.append(instance_info['PrivateIpAddress'])
|
||||
else:
|
||||
not_running.append(instance_info['InstanceId'])
|
||||
|
||||
if len(not_running) > 0:
|
||||
logger.info("Still waiting on {} workers to spin up".format(len(not_running)))
|
||||
time.sleep(5)
|
||||
else:
|
||||
logger.info("Finished spinning up workers")
|
||||
all_running = True
|
||||
break
|
||||
|
||||
if not all_running:
|
||||
raise StandardError(
|
||||
"Timed out waiting to spin up all workers."
|
||||
)
|
||||
logger.info("Successfully booted up {} workers.".format(number_of_workers))
|
||||
|
||||
not_ready_ip_addresses = ip_addresses[:]
|
||||
logger.info("Checking ssh connection to workers.")
|
||||
pool = Pool(processes=number_of_workers)
|
||||
for ssh_try in range(0, self.WORKER_SSH_ATTEMPTS):
|
||||
results = pool.map(_check_worker_ready, not_ready_ip_addresses)
|
||||
deleted_ips = 0
|
||||
for num in range(0, len(results)):
|
||||
if results[num] == 0:
|
||||
del(not_ready_ip_addresses[num - deleted_ips])
|
||||
deleted_ips += 1
|
||||
|
||||
if len(not_ready_ip_addresses) == 0:
|
||||
logger.info("All workers are ready for tests.")
|
||||
break
|
||||
|
||||
if ssh_try == self.WORKER_SSH_ATTEMPTS - 1:
|
||||
raise StandardError(
|
||||
"Max ssh tries to remote workers reached."
|
||||
)
|
||||
|
||||
logger.info("Not all workers are ready. Sleeping for 5 seconds then retrying.")
|
||||
time.sleep(5)
|
||||
|
||||
# Generate .txt files containing IP addresses and instance ids
|
||||
ip_list_string = ",".join(ip_addresses)
|
||||
logger.info("Worker IP list: {}".format(ip_list_string))
|
||||
ip_list_file = open("pytest_worker_ips.txt", "w")
|
||||
ip_list_file.write(ip_list_string)
|
||||
ip_list_file.close()
|
||||
|
||||
worker_instance_id_list_string = ",".join(worker_instance_ids)
|
||||
logger.info("Worker Instance Id list: {}".format(worker_instance_id_list_string))
|
||||
worker_arn_file = open("pytest_worker_instance_ids.txt", "w")
|
||||
worker_arn_file.write(worker_instance_id_list_string)
|
||||
worker_arn_file.close()
|
||||
|
||||
def terminate_workers(self, worker_instance_ids):
|
||||
"""
|
||||
Terminates workers based on a list of worker_instance_ids.
|
||||
"""
|
||||
instance_id_list = worker_instance_ids.split(',')
|
||||
response = self.ec2.terminate_instances(
|
||||
InstanceIds=instance_id_list
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="PytestWorkerManager, manages EC2 workers in an AWS cluster."
|
||||
)
|
||||
|
||||
parser.add_argument('--action', '-a', choices=['up', 'down'], default=None,
|
||||
help="Action for PytestWorkerManager to perform. "
|
||||
"Either up for spinning up AWS EC2 workers or down for terminating them")
|
||||
|
||||
parser.add_argument('--region', '-g', default='us-east-1',
|
||||
help="AWS region where EC2 infrastructure lives. Defaults to us-east-1")
|
||||
|
||||
# Spinning up workers
|
||||
parser.add_argument('--num-workers', '-n', type=int, default=None,
|
||||
help="Number of EC2 workers to spin up")
|
||||
|
||||
parser.add_argument('--ami', '-ami', default=None,
|
||||
help="AMI for workers")
|
||||
|
||||
parser.add_argument('--instance-type', '-type', default=None,
|
||||
help="Desired EC2 instance type")
|
||||
|
||||
parser.add_argument('--subnet-id', '-s', default=None,
|
||||
help="Subnet for the workers to exist in")
|
||||
|
||||
parser.add_argument('--security_groups', '-sg', nargs='+', default=None,
|
||||
help="List of security group ids to apply to workers")
|
||||
|
||||
parser.add_argument('--key-name', '-key', default=None,
|
||||
help="Key pair name for sshing to worker")
|
||||
|
||||
parser.add_argument('--iam-arn', '-iam', default=None,
|
||||
help="Iam Instance Profile ARN for the workers")
|
||||
|
||||
# Terminating workers
|
||||
parser.add_argument('--instance-ids', '-ids', default=None,
|
||||
help="Instance ids terminate")
|
||||
|
||||
args = parser.parse_args()
|
||||
workerManager = PytestWorkerManager(args.region)
|
||||
|
||||
if args.action == 'up':
|
||||
workerManager.spin_up_workers(
|
||||
args.num_workers,
|
||||
args.ami,
|
||||
args.instance_type,
|
||||
args.subnet_id,
|
||||
args.security_groups,
|
||||
args.key_name,
|
||||
args.iam_arn
|
||||
)
|
||||
elif args.action == 'down':
|
||||
workerManager.terminate_workers(
|
||||
args.instance_ids
|
||||
)
|
||||
else:
|
||||
logger.info("No action specified for PytestWorkerManager")
|
||||
@@ -1,10 +1,10 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
if [ -f pytest_task_arns.txt ]; then
|
||||
echo "Terminating xdist containers with pytest_container_manager.py"
|
||||
xdist_task_arns=$(<pytest_task_arns.txt)
|
||||
python scripts/xdist/pytest_container_manager.py -a down --task_arns ${xdist_task_arns}
|
||||
if [ -f pytest_worker_instance_ids.txt ]; then
|
||||
echo "Terminating xdist workers with pytest_worker_manager.py"
|
||||
xdist_worker_ids=$(<pytest_worker_instance_ids.txt)
|
||||
python scripts/xdist/pytest_worker_manager.py -a down --instance-ids ${xdist_worker_ids}
|
||||
else
|
||||
echo "File: pytest_task_arns.txt not found"
|
||||
echo "File: pytest_worker_instance_ids.txt not found"
|
||||
fi
|
||||
|
||||
10
tox.ini
10
tox.ini
@@ -47,12 +47,16 @@ passenv =
|
||||
SKIP_NPM_INSTALL
|
||||
TARGET_BRANCH
|
||||
TEST_SUITE
|
||||
XDIST_CONTAINER_SECURITY_GROUP
|
||||
XDIST_CONTAINER_SUBNET
|
||||
XDIST_CONTAINER_TASK_NAME
|
||||
XDIST_GIT_BRANCH
|
||||
XDIST_GIT_REFSPEC
|
||||
XDIST_NUM_TASKS
|
||||
XDIST_REMOTE_NUM_PROCESSES
|
||||
XDIST_WORKER_AMI
|
||||
XDIST_WORKER_IAM_PROFILE_ARN
|
||||
XDIST_WORKER_KEY_NAME
|
||||
XDIST_WORKER_SECURITY_GROUP
|
||||
XDIST_WORKER_SUBNET
|
||||
|
||||
deps =
|
||||
django111: -r requirements/edx/django.txt
|
||||
django20: Django>=2.0,<2.1
|
||||
|
||||
Reference in New Issue
Block a user