-
Notifications
You must be signed in to change notification settings - Fork 21
wip: modify ingester to use processes in a 'fan-out' pattern #1672
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
wip: modify ingester to use processes in a 'fan-out' pattern #1672
Conversation
78c9926 to
a6443b2
Compare
4f8b996 to
648d89b
Compare
07f5a91 to
1978947
Compare
| processed, | ||
| stat_ok, | ||
| stat_fail, | ||
| counter_lock, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you type those parameters? I see that you can import the types from multiprocessing.synchronize since the multiprocessing classes are defined there.
https://stackoverflow.com/questions/71988089/python-typing-for-multiprocessing-value
| ) | ||
|
|
||
| batch_len = len(batch) | ||
| if batch_len >= 100 or batch_len >= total_files_count: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this 100 should be a constant
| cycle_start = time.time() | ||
| total_bytes = 0 | ||
| for f in json_files: | ||
| total_files_count = len(json_files) | ||
|
|
||
| process_queue: multiprocessing.Queue[Optional[list[SubmissionFileMetadata]]] = ( | ||
| multiprocessing.Queue(maxsize=INGEST_QUEUE_MAXSIZE) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cycle_start, total_files_count and process_queue don't need to be moved up here, they could be defined after the first out() is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm using these variables in the loop right below
| % ( | ||
| total_files, | ||
| writers = [] | ||
| for _ in range(max_workers): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove the try-except for keyboard interrupt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forgot to add it after changes, fixed
| FILES_INGESTER_COUNTER.labels(INGESTER_GRAFANA_LABEL).inc() | ||
|
|
||
| return True | ||
| if any(len(instances_dict[table]) for table in instances_dict): # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mypy complains about:
TypedDict key must be a string literal; expected one of ("issues", "checkouts", "builds", "tests", "incidents")Mypyliteral-required
1978947 to
b6b8376
Compare
This PR reviews the ingester architecture to use processes both for file processing and database insertion, allowing faster processing using multiple database connections doing batch inserts at the same time.
How to test
poetry run python manage.py monitor_submissions [...options]Closes #1673