From 02d893b1cd043ed60c49d77e4f2839328ce2de72 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Fri, 12 Dec 2025 12:11:41 -0500 Subject: [PATCH 1/2] test: use responses in test/test_create.py This allows us to get more coverage out of these tests. --- test/test_create.py | 101 +++++++++++++++++++++++++++++++++----------- test/test_docker.py | 4 ++ 2 files changed, 81 insertions(+), 24 deletions(-) diff --git a/test/test_create.py b/test/test_create.py index 1b93c9b14..fb5bf87b8 100644 --- a/test/test_create.py +++ b/test/test_create.py @@ -2,32 +2,73 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. - +import json +import re import unittest from unittest import mock +import responses +from taskcluster.exceptions import TaskclusterRestFailure + from taskgraph import create from taskgraph.config import GraphConfig from taskgraph.graph import Graph from taskgraph.task import Task from taskgraph.taskgraph import TaskGraph +from taskgraph.util import taskcluster as tc_util GRAPH_CONFIG = GraphConfig({"trust-domain": "domain"}, "/var/empty") -class TestCreate(unittest.TestCase): - def setUp(self): - self.created_tasks = {} - self.old_create_task = create.create_task - create.create_task = self.fake_create_task +def mock_taskcluster_api( + created_tasks=None, error_status=None, error_message=None, error_task_ids=None +): + """Mock the Taskcluster Queue API for create task calls.""" + + def request_callback(request): + task_id = request.url.split("/")[-1] + + # Check if this task should error + if error_status is not None: + if error_task_ids is None or task_id in error_task_ids: + # Support per-task error messages + if isinstance(error_message, dict): + message = error_message.get(task_id, "error") + else: + message = error_message or "error" + return (error_status, {}, f'{{"message": "{message}"}}') + + # Success case - capture task definition if requested + if created_tasks is not None: + task_def = json.loads(request.body) + created_tasks[task_id] = task_def + + return (200, {}, f'{{"status": {{"taskId": "{task_id}"}}}}') - def tearDown(self): - create.create_task = self.old_create_task + responses.add_callback( + responses.PUT, + re.compile(r"https://tc\.example\.com/api/queue/v1/task/.*"), + callback=request_callback, + content_type="application/json", + ) - def fake_create_task(self, session, task_id, label, task_def): - self.created_tasks[task_id] = task_def +class TestCreate(unittest.TestCase): + def setUp(self): + # Clear cached Taskcluster clients/sessions since we're mocking the environment + tc_util.get_taskcluster_client.cache_clear() + tc_util.get_session.cache_clear() + + @responses.activate + @mock.patch.dict( + "os.environ", + {"TASKCLUSTER_ROOT_URL": "https://tc.example.com"}, + clear=True, + ) def test_create_tasks(self): + created_tasks = {} + mock_taskcluster_api(created_tasks=created_tasks) + tasks = { "tid-a": Task( kind="test", label="a", attributes={}, task={"payload": "hello world"} @@ -48,7 +89,8 @@ def test_create_tasks(self): decision_task_id="decisiontask", ) - for tid, task in self.created_tasks.items(): + assert created_tasks + for tid, task in created_tasks.items(): self.assertEqual(task["payload"], "hello world") self.assertEqual(task["schedulerId"], "domain-level-4") # make sure the dependencies exist, at least @@ -56,10 +98,19 @@ def test_create_tasks(self): if depid == "decisiontask": # Don't look for decisiontask here continue - self.assertIn(depid, self.created_tasks) - + self.assertIn(depid, created_tasks) + + @responses.activate + @mock.patch.dict( + "os.environ", + {"TASKCLUSTER_ROOT_URL": "https://tc.example.com"}, + clear=True, + ) def test_create_task_without_dependencies(self): "a task with no dependencies depends on the decision task" + created_tasks = {} + mock_taskcluster_api(created_tasks=created_tasks) + tasks = { "tid-a": Task( kind="test", label="a", attributes={}, task={"payload": "hello world"} @@ -77,12 +128,20 @@ def test_create_task_without_dependencies(self): decision_task_id="decisiontask", ) - for tid, task in self.created_tasks.items(): + assert created_tasks + for tid, task in created_tasks.items(): self.assertEqual(task.get("dependencies"), ["decisiontask"]) - @mock.patch("taskgraph.create.create_task") - def test_create_tasks_fails_if_create_fails(self, create_task): - "creat_tasks fails if a single create_task call fails" + @responses.activate + @mock.patch.dict( + "os.environ", + {"TASKCLUSTER_ROOT_URL": "https://tc.example.com"}, + clear=True, + ) + def test_create_tasks_fails_if_create_fails(self): + "create_tasks fails if a single create_task call fails" + mock_taskcluster_api(error_status=403, error_message="oh no!") + tasks = { "tid-a": Task( kind="test", label="a", attributes={}, task={"payload": "hello world"} @@ -92,13 +151,7 @@ def test_create_tasks_fails_if_create_fails(self, create_task): graph = Graph(nodes={"tid-a"}, edges=set()) taskgraph = TaskGraph(tasks, graph) - def fail(*args): - print("UHOH") - raise RuntimeError("oh no!") - - create_task.side_effect = fail - - with self.assertRaises(RuntimeError): + with self.assertRaises(TaskclusterRestFailure): create.create_tasks( GRAPH_CONFIG, taskgraph, diff --git a/test/test_docker.py b/test/test_docker.py index c75957bbf..5cd5c5b7e 100644 --- a/test/test_docker.py +++ b/test/test_docker.py @@ -8,6 +8,7 @@ from taskgraph import docker from taskgraph.config import GraphConfig from taskgraph.transforms.docker_image import IMAGE_BUILDER_IMAGE +from taskgraph.util import taskcluster as tc_util from taskgraph.util.vcs import get_repository from .conftest import nowin @@ -22,6 +23,9 @@ def root_url(): def mock_environ(monkeypatch, root_url): # Ensure user specified environment variables don't interfere with URLs. monkeypatch.setattr(os, "environ", {"TASKCLUSTER_ROOT_URL": root_url}) + # Clear cached Taskcluster clients/sessions since we're mocking the environment + tc_util.get_taskcluster_client.cache_clear() + tc_util.get_session.cache_clear() @pytest.fixture(autouse=True, scope="module") From eb3fee6f130cdb1480e9899e92494bf886f879e3 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Mon, 8 Dec 2025 11:51:32 -0500 Subject: [PATCH 2/2] fix: improve error handling on task creation failure This prints the label of the task we failed to create for convenience, as well as defers failing the Decision task until after we've attempted all tasks. This way we'll see all the scope errors at once rather than needing to fix them one at a time. --- src/taskgraph/create.py | 51 ++++++++++++++++++++++++++++++++--------- test/test_create.py | 47 +++++++++++++++++++++++++++++++++++-- 2 files changed, 85 insertions(+), 13 deletions(-) diff --git a/src/taskgraph/create.py b/src/taskgraph/create.py index ca876a809..c8e0c7b40 100644 --- a/src/taskgraph/create.py +++ b/src/taskgraph/create.py @@ -20,6 +20,18 @@ testing = False +class CreateTasksException(Exception): + """Exception raised when one or more tasks could not be created.""" + + def __init__(self, errors: dict[str, Exception]): + message = "" + for label, exc in errors.items(): + message += f"\nERROR: Could not create '{label}':\n\n" + message += "\n".join(f" {line}" for line in str(exc).splitlines()) + "\n" + + super().__init__(message) + + def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task_id): taskid_to_label = {t: l for l, t in label_to_taskid.items()} @@ -50,6 +62,9 @@ def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task session = get_session() with futures.ThreadPoolExecutor(concurrency) as e: fs = {} + fs_to_task = {} + skipped = set() + errors = {} # We can't submit a task until its dependencies have been submitted. # So our strategy is to walk the graph and submit tasks once all @@ -57,11 +72,13 @@ def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task tasklist = set(taskgraph.graph.visit_postorder()) alltasks = tasklist.copy() - def schedule_tasks(): - # bail out early if any futures have failed - if any(f.done() and f.exception() for f in fs.values()): - return + def handle_exception(fut): + if exc := fut.exception(): + task_id, label = fs_to_task[fut] + skipped.add(task_id) + errors[label] = exc + def schedule_tasks(): to_remove = set() new = set() @@ -69,14 +86,24 @@ def submit(task_id, label, task_def): fut = e.submit(create_task, session, task_id, label, task_def) new.add(fut) fs[task_id] = fut + fs_to_task[fut] = (task_id, label) + fut.add_done_callback(handle_exception) for task_id in tasklist: task_def = taskgraph.tasks[task_id].task - # If we haven't finished submitting all our dependencies yet, - # come back to this later. # Some dependencies aren't in our graph, so make sure to filter # those out deps = set(task_def.get("dependencies", [])) & alltasks + + # If one of the dependencies didn't get created, then + # don't attempt to submit as it would fail. + if any(d in skipped for d in deps): + skipped.add(task_id) + to_remove.add(task_id) + continue + + # If we haven't finished submitting all our dependencies yet, + # come back to this later. if any((d not in fs or not fs[d].done()) for d in deps): continue @@ -90,16 +117,18 @@ def submit(task_id, label, task_def): submit(slugid(), taskid_to_label[task_id], task_def) tasklist.difference_update(to_remove) - # as each of those futures complete, try to schedule more tasks + # As each of those futures complete, try to schedule more tasks. for f in futures.as_completed(new): schedule_tasks() - # start scheduling tasks and run until everything is scheduled + # Start scheduling tasks and run until everything is scheduled. schedule_tasks() - # check the result of each future, raising an exception if it failed - for f in futures.as_completed(fs.values()): - f.result() + # Wait for all futures to complete. + futures.wait(fs.values()) + + if errors: + raise CreateTasksException(errors) def create_task(session, task_id, label, task_def): diff --git a/test/test_create.py b/test/test_create.py index fb5bf87b8..a72a89f31 100644 --- a/test/test_create.py +++ b/test/test_create.py @@ -8,10 +8,10 @@ from unittest import mock import responses -from taskcluster.exceptions import TaskclusterRestFailure from taskgraph import create from taskgraph.config import GraphConfig +from taskgraph.create import CreateTasksException from taskgraph.graph import Graph from taskgraph.task import Task from taskgraph.taskgraph import TaskGraph @@ -151,7 +151,7 @@ def test_create_tasks_fails_if_create_fails(self): graph = Graph(nodes={"tid-a"}, edges=set()) taskgraph = TaskGraph(tasks, graph) - with self.assertRaises(TaskclusterRestFailure): + with self.assertRaises(CreateTasksException): create.create_tasks( GRAPH_CONFIG, taskgraph, @@ -159,3 +159,46 @@ def test_create_tasks_fails_if_create_fails(self): {"level": "4"}, decision_task_id="decisiontask", ) + + @responses.activate + @mock.patch.dict( + "os.environ", + {"TASKCLUSTER_ROOT_URL": "https://tc.example.com"}, + clear=True, + ) + def test_create_tasks_collects_multiple_errors(self): + "create_tasks collects all errors from multiple failing tasks" + mock_taskcluster_api( + error_status=409, + error_message={ + "tid-a": "scope error for task a", + "tid-b": "scope error for task b", + }, + error_task_ids={"tid-a", "tid-b"}, + ) + + tasks = { + "tid-a": Task( + kind="test", label="a", attributes={}, task={"payload": "hello world"} + ), + "tid-b": Task( + kind="test", label="b", attributes={}, task={"payload": "hello world"} + ), + } + label_to_taskid = {"a": "tid-a", "b": "tid-b"} + graph = Graph(nodes={"tid-a", "tid-b"}, edges=set()) + taskgraph = TaskGraph(tasks, graph) + + with self.assertRaises(CreateTasksException) as cm: + create.create_tasks( + GRAPH_CONFIG, + taskgraph, + label_to_taskid, + {"level": "4"}, + decision_task_id="decisiontask", + ) + + # Verify both errors are in the exception message + exception_message = str(cm.exception) + self.assertIn("Could not create 'a'", exception_message) + self.assertIn("Could not create 'b'", exception_message)