diff --git a/CHANGELOG.md b/CHANGELOG.md index 07db7b5..162dbdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ # Changelog +## v1.7.0 6/13/25 +- Use fastavro for avro encoding/decoding + ## v1.6.5 3/24/25 - Add capability to return PostgreSQL cursor description diff --git a/pyproject.toml b/pyproject.toml index 9fc68e8..44fd78d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "nypl_py_utils" -version = "1.6.5" +version = "1.7.0" authors = [ { name="Aaron Friedman", email="aaronfriedman@nypl.org" }, ] @@ -24,7 +24,7 @@ dependencies = [] [project.optional-dependencies] avro-client = [ - "avro>=1.11.1", + "fastavro>=1.11.1", "requests>=2.28.1" ] cloudlibrary-client = [ @@ -82,7 +82,7 @@ development = [ "flake8>=6.0.0", "freezegun>=1.2.2", "mock>=4.0.3", - "pytest>=7.2.0", + "pytest==8.0", "pytest-mock>=3.10.0", "requests-mock>=1.10.0" ] diff --git a/src/nypl_py_utils/classes/avro_client.py b/src/nypl_py_utils/classes/avro_client.py index 8338490..7abeead 100644 --- a/src/nypl_py_utils/classes/avro_client.py +++ b/src/nypl_py_utils/classes/avro_client.py @@ -1,8 +1,7 @@ -import avro.schema +import json import requests -from avro.errors import AvroException -from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter +from fastavro import schemaless_writer, schemaless_reader, parse_schema from io import BytesIO from nypl_py_utils.functions.log_helper import create_log from requests.adapters import HTTPAdapter, Retry @@ -23,7 +22,7 @@ def __init__(self, platform_schema_url): self.session = requests.Session() self.session.mount("https://", HTTPAdapter(max_retries=retry_policy)) - self.schema = avro.schema.parse( + self.schema = parse_schema( self.get_json_schema(platform_schema_url)) def get_json_schema(self, platform_schema_url): @@ -52,7 +51,7 @@ def get_json_schema(self, platform_schema_url): try: json_response = response.json() - return json_response["data"]["schema"] + return json.loads(json_response["data"]["schema"]) except (JSONDecodeError, KeyError) as e: self.logger.error( "Retrieved schema is malformed: {errorType} {errorMessage}" @@ -70,26 +69,28 @@ class AvroEncoder(AvroClient): Platform API endpoint from which to fetch the schema in JSON format. """ - def encode_record(self, record): + def encode_record(self, record, silent=False): """ Encodes a single JSON record using the given Avro schema. Returns the encoded record as a byte string. """ - self.logger.debug( - "Encoding record using {schema} schema".format( - schema=self.schema.name) - ) - datum_writer = DatumWriter(self.schema) + if not silent: + self.logger.info( + "Encoding record using {schema} schema".format( + schema=self.schema['name'] + ) + ) with BytesIO() as output_stream: - encoder = BinaryEncoder(output_stream) try: - datum_writer.write(record, encoder) + schemaless_writer(output_stream, self.schema, record, + strict_allow_default=True) return output_stream.getvalue() - except AvroException as e: + except Exception as e: self.logger.error("Failed to encode record: {}".format(e)) raise AvroClientError( - "Failed to encode record: {}".format(e)) from None + "Failed to encode record: {}".format(e) + ) from None def encode_batch(self, record_list): """ @@ -99,25 +100,11 @@ def encode_batch(self, record_list): """ self.logger.info( "Encoding ({num_rec}) records using {schema} schema".format( - num_rec=len(record_list), schema=self.schema.name + num_rec=len(record_list), schema=self.schema['name'] ) ) - encoded_records = [] - datum_writer = DatumWriter(self.schema) - with BytesIO() as output_stream: - encoder = BinaryEncoder(output_stream) - for record in record_list: - try: - datum_writer.write(record, encoder) - encoded_records.append(output_stream.getvalue()) - output_stream.seek(0) - output_stream.truncate(0) - except AvroException as e: - self.logger.error("Failed to encode record: {}".format(e)) - raise AvroClientError( - "Failed to encode record: {}".format(e) - ) from None - return encoded_records + return [self.encode_record(record, silent=True) + for record in record_list] class AvroDecoder(AvroClient): @@ -126,23 +113,22 @@ class AvroDecoder(AvroClient): Platform API endpoint from which to fetch the schema in JSON format. """ - def decode_record(self, record): + def decode_record(self, record, silent=False): """ Decodes a single record represented using the given Avro schema. Input must be a bytes-like object. Returns a dictionary where each key is a field in the schema. """ - self.logger.debug( - "Decoding {rec} using {schema} schema".format( - rec=record, schema=self.schema.name + if not silent: + self.logger.info( + "Decoding record using {schema} schema".format( + schema=self.schema['name'] + ) ) - ) - datum_reader = DatumReader(self.schema) with BytesIO(record) as input_stream: - decoder = BinaryDecoder(input_stream) try: - return datum_reader.read(decoder) + return schemaless_reader(input_stream, self.schema) except Exception as e: self.logger.error("Failed to decode record: {}".format(e)) raise AvroClientError( @@ -157,14 +143,11 @@ def decode_batch(self, record_list): """ self.logger.info( "Decoding ({num_rec}) records using {schema} schema".format( - num_rec=len(record_list), schema=self.schema.name + num_rec=len(record_list), schema=self.schema['name'] ) ) - decoded_records = [] - for record in record_list: - decoded_record = self.decode_record(record) - decoded_records.append(decoded_record) - return decoded_records + return [self.decode_record(record, silent=True) + for record in record_list] class AvroClientError(Exception): diff --git a/tests/test_avro_client.py b/tests/test_avro_client.py index 4af11b3..46d902c 100644 --- a/tests/test_avro_client.py +++ b/tests/test_avro_client.py @@ -20,6 +20,44 @@ ] })}} +FASTAVRO_SCHEMA = { + "type": "record", + "name": "TestSchema", + "fields": [ + { + "name": "patron_id", + "type": "int" + }, + { + "name": "library_branch", + "type": [ + "null", + "string" + ] + } + ], + "__fastavro_parsed": True, + "__named_schemas": { + "TestSchema": { + "type": "record", + "name": "TestSchema", + "fields": [ + { + "name": "patron_id", + "type": "int" + }, + { + "name": "library_branch", + "type": [ + "null", + "string" + ] + } + ] + } + } +} + class TestAvroClient: @pytest.fixture @@ -36,10 +74,7 @@ def test_avro_decoder_instance(self, requests_mock): def test_get_json_schema_success(self, test_avro_encoder_instance, test_avro_decoder_instance): - assert test_avro_encoder_instance.schema == _TEST_SCHEMA["data"][ - "schema"] - assert test_avro_decoder_instance.schema == _TEST_SCHEMA["data"][ - "schema"] + assert test_avro_encoder_instance.schema == FASTAVRO_SCHEMA def test_get_json_schema_error(self, requests_mock): requests_mock.get("https://test_schema_url", exc=ConnectTimeout)