From fa92d91d90276ec184b6a0d7563e2906e0feafde Mon Sep 17 00:00:00 2001 From: Ian O'Connor Date: Fri, 13 Jun 2025 12:41:31 -0400 Subject: [PATCH 1/6] Use fastavro for avro encoding/decoding --- pyproject.toml | 2 +- src/nypl_py_utils/classes/avro_client.py | 73 +++++++++--------------- tests/test_avro_client.py | 42 ++++++++++++-- 3 files changed, 66 insertions(+), 51 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9fc68e8..81eedcb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [] [project.optional-dependencies] avro-client = [ - "avro>=1.11.1", + "fastavro>=1.11.1", "requests>=2.28.1" ] cloudlibrary-client = [ diff --git a/src/nypl_py_utils/classes/avro_client.py b/src/nypl_py_utils/classes/avro_client.py index 8338490..29fb1fd 100644 --- a/src/nypl_py_utils/classes/avro_client.py +++ b/src/nypl_py_utils/classes/avro_client.py @@ -1,8 +1,7 @@ -import avro.schema +import json import requests -from avro.errors import AvroException -from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter +from fastavro import schemaless_writer, schemaless_reader, parse_schema from io import BytesIO from nypl_py_utils.functions.log_helper import create_log from requests.adapters import HTTPAdapter, Retry @@ -23,7 +22,7 @@ def __init__(self, platform_schema_url): self.session = requests.Session() self.session.mount("https://", HTTPAdapter(max_retries=retry_policy)) - self.schema = avro.schema.parse( + self.schema = parse_schema( self.get_json_schema(platform_schema_url)) def get_json_schema(self, platform_schema_url): @@ -52,7 +51,7 @@ def get_json_schema(self, platform_schema_url): try: json_response = response.json() - return json_response["data"]["schema"] + return json.loads(json_response["data"]["schema"]) except (JSONDecodeError, KeyError) as e: self.logger.error( "Retrieved schema is malformed: {errorType} {errorMessage}" @@ -70,26 +69,27 @@ class AvroEncoder(AvroClient): Platform API endpoint from which to fetch the schema in JSON format. """ - def encode_record(self, record): + def encode_record(self, record, silent=False): """ Encodes a single JSON record using the given Avro schema. Returns the encoded record as a byte string. """ - self.logger.debug( - "Encoding record using {schema} schema".format( - schema=self.schema.name) - ) - datum_writer = DatumWriter(self.schema) + if not silent: + self.logger.info( + "Encoding record using {schema} schema".format( + schema=self.schema['name'] + ) + ) with BytesIO() as output_stream: - encoder = BinaryEncoder(output_stream) try: - datum_writer.write(record, encoder) + schemaless_writer(output_stream, self.schema, record, strict_allow_default=True) return output_stream.getvalue() - except AvroException as e: + except Exception as e: self.logger.error("Failed to encode record: {}".format(e)) raise AvroClientError( - "Failed to encode record: {}".format(e)) from None + "Failed to encode record: {}".format(e) + ) from None def encode_batch(self, record_list): """ @@ -99,25 +99,10 @@ def encode_batch(self, record_list): """ self.logger.info( "Encoding ({num_rec}) records using {schema} schema".format( - num_rec=len(record_list), schema=self.schema.name + num_rec=len(record_list), schema=self.schema['name'] ) ) - encoded_records = [] - datum_writer = DatumWriter(self.schema) - with BytesIO() as output_stream: - encoder = BinaryEncoder(output_stream) - for record in record_list: - try: - datum_writer.write(record, encoder) - encoded_records.append(output_stream.getvalue()) - output_stream.seek(0) - output_stream.truncate(0) - except AvroException as e: - self.logger.error("Failed to encode record: {}".format(e)) - raise AvroClientError( - "Failed to encode record: {}".format(e) - ) from None - return encoded_records + return [self.encode_record(record, silent=True) for record in record_list] class AvroDecoder(AvroClient): @@ -126,28 +111,28 @@ class AvroDecoder(AvroClient): Platform API endpoint from which to fetch the schema in JSON format. """ - def decode_record(self, record): + def decode_record(self, record, silent=False): """ Decodes a single record represented using the given Avro schema. Input must be a bytes-like object. Returns a dictionary where each key is a field in the schema. """ - self.logger.debug( - "Decoding {rec} using {schema} schema".format( - rec=record, schema=self.schema.name + if not silent: + self.logger.info( + "Decoding record using {schema} schema".format( + schema=self.schema['name'] + ) ) - ) - datum_reader = DatumReader(self.schema) with BytesIO(record) as input_stream: - decoder = BinaryDecoder(input_stream) try: - return datum_reader.read(decoder) + return schemaless_reader(input_stream, self.schema) except Exception as e: self.logger.error("Failed to decode record: {}".format(e)) raise AvroClientError( "Failed to decode record: {}".format(e)) from None + def decode_batch(self, record_list): """ Decodes a list of JSON records using the given Avro schema. Input @@ -157,14 +142,10 @@ def decode_batch(self, record_list): """ self.logger.info( "Decoding ({num_rec}) records using {schema} schema".format( - num_rec=len(record_list), schema=self.schema.name + num_rec=len(record_list), schema=self.schema['name'] ) ) - decoded_records = [] - for record in record_list: - decoded_record = self.decode_record(record) - decoded_records.append(decoded_record) - return decoded_records + return [self.decode_record(record, silent=True) for record in record_list] class AvroClientError(Exception): diff --git a/tests/test_avro_client.py b/tests/test_avro_client.py index 4af11b3..a10cdc7 100644 --- a/tests/test_avro_client.py +++ b/tests/test_avro_client.py @@ -20,6 +20,43 @@ ] })}} +FASTAVRO_SCHEMA = { + "type": "record", + "name": "TestSchema", + "fields": [ + { + "name": "patron_id", + "type": "int" + }, + { + "name": "library_branch", + "type": [ + "null", + "string" + ] + } + ], + "__fastavro_parsed": True, + "__named_schemas": { + "TestSchema": { + "type": "record", + "name": "TestSchema", + "fields": [ + { + "name": "patron_id", + "type": "int" + }, + { + "name": "library_branch", + "type": [ + "null", + "string" + ] + } + ] + } + } +} class TestAvroClient: @pytest.fixture @@ -36,10 +73,7 @@ def test_avro_decoder_instance(self, requests_mock): def test_get_json_schema_success(self, test_avro_encoder_instance, test_avro_decoder_instance): - assert test_avro_encoder_instance.schema == _TEST_SCHEMA["data"][ - "schema"] - assert test_avro_decoder_instance.schema == _TEST_SCHEMA["data"][ - "schema"] + assert test_avro_encoder_instance.schema == FASTAVRO_SCHEMA def test_get_json_schema_error(self, requests_mock): requests_mock.get("https://test_schema_url", exc=ConnectTimeout) From 58a30b252a0e50fc4452905b0bd211027260cead Mon Sep 17 00:00:00 2001 From: Ian O'Connor Date: Fri, 13 Jun 2025 13:04:52 -0400 Subject: [PATCH 2/6] Bump version in pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 81eedcb..522495d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "nypl_py_utils" -version = "1.6.5" +version = "1.6.6" authors = [ { name="Aaron Friedman", email="aaronfriedman@nypl.org" }, ] From 7d9b610d10c0764d8b0c0febeb5d4c1965cdfad4 Mon Sep 17 00:00:00 2001 From: Ian O'Connor Date: Fri, 13 Jun 2025 13:08:19 -0400 Subject: [PATCH 3/6] Update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07db7b5..c8fbb85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ # Changelog +## v1.6.5 6/13/25 +- Use fastavro for avro encoding/decoding + ## v1.6.5 3/24/25 - Add capability to return PostgreSQL cursor description From 90457bfa595543d2fe70691f3473ed6e4b2d812a Mon Sep 17 00:00:00 2001 From: Ian O'Connor Date: Fri, 13 Jun 2025 13:11:20 -0400 Subject: [PATCH 4/6] Fix lint errors --- src/nypl_py_utils/classes/avro_client.py | 10 ++++++---- tests/test_avro_client.py | 1 + 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/nypl_py_utils/classes/avro_client.py b/src/nypl_py_utils/classes/avro_client.py index 29fb1fd..7abeead 100644 --- a/src/nypl_py_utils/classes/avro_client.py +++ b/src/nypl_py_utils/classes/avro_client.py @@ -83,7 +83,8 @@ def encode_record(self, record, silent=False): ) with BytesIO() as output_stream: try: - schemaless_writer(output_stream, self.schema, record, strict_allow_default=True) + schemaless_writer(output_stream, self.schema, record, + strict_allow_default=True) return output_stream.getvalue() except Exception as e: self.logger.error("Failed to encode record: {}".format(e)) @@ -102,7 +103,8 @@ def encode_batch(self, record_list): num_rec=len(record_list), schema=self.schema['name'] ) ) - return [self.encode_record(record, silent=True) for record in record_list] + return [self.encode_record(record, silent=True) + for record in record_list] class AvroDecoder(AvroClient): @@ -132,7 +134,6 @@ def decode_record(self, record, silent=False): raise AvroClientError( "Failed to decode record: {}".format(e)) from None - def decode_batch(self, record_list): """ Decodes a list of JSON records using the given Avro schema. Input @@ -145,7 +146,8 @@ def decode_batch(self, record_list): num_rec=len(record_list), schema=self.schema['name'] ) ) - return [self.decode_record(record, silent=True) for record in record_list] + return [self.decode_record(record, silent=True) + for record in record_list] class AvroClientError(Exception): diff --git a/tests/test_avro_client.py b/tests/test_avro_client.py index a10cdc7..46d902c 100644 --- a/tests/test_avro_client.py +++ b/tests/test_avro_client.py @@ -58,6 +58,7 @@ } } + class TestAvroClient: @pytest.fixture def test_avro_encoder_instance(self, requests_mock): From 5e2046bd88e0941d09f5baf87d89e9442f4114ae Mon Sep 17 00:00:00 2001 From: Ian O'Connor Date: Fri, 13 Jun 2025 13:19:07 -0400 Subject: [PATCH 5/6] Freeze pytest version to fix broken cloudlibrary+kinesis tests --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 522495d..86fb7e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -82,7 +82,7 @@ development = [ "flake8>=6.0.0", "freezegun>=1.2.2", "mock>=4.0.3", - "pytest>=7.2.0", + "pytest==8.0", "pytest-mock>=3.10.0", "requests-mock>=1.10.0" ] From eeebb562ddfd1a0812ce2f6294b5a09761ff072f Mon Sep 17 00:00:00 2001 From: Ian O'Connor Date: Tue, 17 Jun 2025 11:37:53 -0400 Subject: [PATCH 6/6] Minor version bump instead of patch bump --- CHANGELOG.md | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8fbb85..162dbdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ # Changelog -## v1.6.5 6/13/25 +## v1.7.0 6/13/25 - Use fastavro for avro encoding/decoding ## v1.6.5 3/24/25 diff --git a/pyproject.toml b/pyproject.toml index 86fb7e6..44fd78d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "nypl_py_utils" -version = "1.6.6" +version = "1.7.0" authors = [ { name="Aaron Friedman", email="aaronfriedman@nypl.org" }, ]