Skip to content

Conversation

@gustavobtflores
Copy link
Contributor

@gustavobtflores gustavobtflores commented Dec 11, 2025

This PR reviews the ingester architecture to use processes both for file processing and database insertion, allowing faster processing using multiple database connections doing batch inserts at the same time.

How to test

  1. Get some submissions files and a trees.yaml file
  2. Run the monitor_submissions command poetry run python manage.py monitor_submissions [...options]
  3. Check if the submissions are being moved to the correct directory and data inserted correctly in the database

Closes #1673

@gustavobtflores gustavobtflores self-assigned this Dec 11, 2025
@gustavobtflores gustavobtflores added the Ingester The issue relates to the ingester tool, including the command itself and related functions. label Dec 11, 2025
@gustavobtflores gustavobtflores force-pushed the refactor/ingester-processing-architecture branch from 78c9926 to a6443b2 Compare December 15, 2025 22:04
@gustavobtflores gustavobtflores force-pushed the refactor/ingester-processing-architecture branch 2 times, most recently from 4f8b996 to 648d89b Compare January 6, 2026 18:21
@gustavobtflores gustavobtflores marked this pull request as ready for review January 6, 2026 18:26
@gustavobtflores gustavobtflores force-pushed the refactor/ingester-processing-architecture branch 2 times, most recently from 07f5a91 to 1978947 Compare January 7, 2026 13:09
Comment on lines 268 to 271
processed,
stat_ok,
stat_fail,
counter_lock,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you type those parameters? I see that you can import the types from multiprocessing.synchronize since the multiprocessing classes are defined there.

https://stackoverflow.com/questions/71988089/python-typing-for-multiprocessing-value

https://stackoverflow.com/questions/70424427/using-python-multiprocessing-lock-as-an-argument-type-in-mypy

)

batch_len = len(batch)
if batch_len >= 100 or batch_len >= total_files_count:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this 100 should be a constant

Comment on lines +440 to +448
cycle_start = time.time()
total_bytes = 0
for f in json_files:
total_files_count = len(json_files)

process_queue: multiprocessing.Queue[Optional[list[SubmissionFileMetadata]]] = (
multiprocessing.Queue(maxsize=INGEST_QUEUE_MAXSIZE)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cycle_start, total_files_count and process_queue don't need to be moved up here, they could be defined after the first out() is called

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm using these variables in the loop right below

% (
total_files,
writers = []
for _ in range(max_workers):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the try-except for keyboard interrupt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot to add it after changes, fixed

FILES_INGESTER_COUNTER.labels(INGESTER_GRAFANA_LABEL).inc()

return True
if any(len(instances_dict[table]) for table in instances_dict): # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mypy complains about:

TypedDict key must be a string literal; expected one of ("issues", "checkouts", "builds", "tests", "incidents")Mypyliteral-required

@gustavobtflores gustavobtflores force-pushed the refactor/ingester-processing-architecture branch from 1978947 to b6b8376 Compare January 7, 2026 22:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Ingester The issue relates to the ingester tool, including the command itself and related functions.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Parallelize the ingester by turning each worker into a full ingestion pipeline

2 participants