From db19edf3f21c7a1bbce70a4dc9cf8cbebe2c4cc4 Mon Sep 17 00:00:00 2001 From: David Steinberg Date: Mon, 31 Mar 2025 15:06:43 -0400 Subject: [PATCH 1/3] Add pysam to extras --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 02f842b9..b13e997c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,7 @@ dependencies = [ "pydantic~=2.1", "bioutils", "requests", - "canonicaljson", + "canonicaljson" ] [project.optional-dependencies] @@ -49,6 +49,7 @@ extras = [ "hgvs>=1.4", "dill~=0.3.7", "click", + "pysam>=0.23.0", ] dev = [ # tests From 08d455e8c7bebf819d3cbe1c1b3961618cd17073 Mon Sep 17 00:00:00 2001 From: David Steinberg Date: Mon, 31 Mar 2025 15:08:28 -0400 Subject: [PATCH 2/3] Add setuptools --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b13e997c..7984e307 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,8 @@ dependencies = [ "pydantic~=2.1", "bioutils", "requests", - "canonicaljson" + "canonicaljson", + "setuptools>=78.1.0", ] [project.optional-dependencies] From 322a33057e30f61d1c7b4ffef9df370f62e32733 Mon Sep 17 00:00:00 2001 From: David Steinberg Date: Mon, 31 Mar 2025 16:54:29 -0400 Subject: [PATCH 3/3] Thread the VCF annotator --- src/ga4gh/vrs/extras/annotator/vcf.py | 108 +++++++++++++++----------- 1 file changed, 64 insertions(+), 44 deletions(-) diff --git a/src/ga4gh/vrs/extras/annotator/vcf.py b/src/ga4gh/vrs/extras/annotator/vcf.py index 05fd5947..4c95eac4 100644 --- a/src/ga4gh/vrs/extras/annotator/vcf.py +++ b/src/ga4gh/vrs/extras/annotator/vcf.py @@ -1,11 +1,17 @@ """Annotate VCFs with VRS identifiers and attributes.""" - +import asyncio +import concurrent.futures import abc import logging +import os import pickle from enum import Enum from pathlib import Path from typing import Literal +from queue import Queue +from threading import Thread + + import pysam @@ -216,55 +222,69 @@ def annotate( vcf_out = None allele_collection = [] if self.collect_alleles else None + record_queue = Queue() + result_queue = Queue() + num_workers = os.cpu_count() or 4 # Adjust based on system resources + + def worker(): + while True: + record = record_queue.get() + if record is None: + break + try: + additional_info_fields = [FieldName.IDS_FIELD] + if vrs_attributes: + additional_info_fields += [ + FieldName.STARTS_FIELD, + FieldName.ENDS_FIELD, + FieldName.STATES_FIELD, + ] + vrs_field_data = self._get_vrs_data( + record, + allele_collection, + assembly, + additional_info_fields, + vrs_attributes=vrs_attributes, + compute_for_ref=compute_for_ref, + require_validation=require_validation, + **kwargs, + ) + except Exception as ex: + _logger.exception("VRS error on %s-%s", record.chrom, record.pos) + err_msg = f"{ex}" or f"{type(ex)}" + err_msg = err_msg.translate(VCF_ESCAPE_MAP) + additional_info_fields = [FieldName.ERROR_FIELD] + vrs_field_data = {FieldName.ERROR_FIELD.value: [err_msg]} + result_queue.put((record, vrs_field_data)) + record_queue.task_done() + + workers = [Thread(target=worker) for _ in range(num_workers)] + for w in workers: + w.start() + for record in vcf: - if vcf_out: - additional_info_fields = [FieldName.IDS_FIELD] - if vrs_attributes: - additional_info_fields += [ - FieldName.STARTS_FIELD, - FieldName.ENDS_FIELD, - FieldName.STATES_FIELD, - ] - else: - # no INFO field names need to be designated if not producing an annotated VCF - additional_info_fields = [] - try: - vrs_field_data = self._get_vrs_data( - record, - allele_collection, - assembly, - additional_info_fields, - vrs_attributes=vrs_attributes, - compute_for_ref=compute_for_ref, - require_validation=require_validation, - **kwargs, - ) - except Exception as ex: - _logger.exception("VRS error on %s-%s", record.chrom, record.pos) - err_msg = f"{ex}" or f"{type(ex)}" - err_msg = err_msg.translate(VCF_ESCAPE_MAP) - additional_info_fields = [FieldName.ERROR_FIELD] - vrs_field_data = {FieldName.ERROR_FIELD.value: [err_msg]} - - _logger.debug( - "VCF record %s-%s generated vrs_field_data %s", - record.chrom, - record.pos, - vrs_field_data, - ) - + record_queue.put(record) + + record_queue.join() + + for _ in range(num_workers): + record_queue.put(None) + for w in workers: + w.join() + + while not result_queue.empty(): + record, vrs_field_data = result_queue.get() if output_vcf_path and vcf_out: - for k in additional_info_fields: - record.info[k.value] = [ - value or k.default_value() for value in vrs_field_data[k.value] + for k in vrs_field_data: + record.info[k] = [ + value for value in vrs_field_data[k] ] - vcf_out.write(record) - - vcf.close() + vcf_out.write(record) + if vcf_out: vcf_out.close() - + if self.collect_alleles: self.on_vrs_object_collection(allele_collection, **kwargs)