Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Changelog
## v1.7.0 6/13/25
- Use fastavro for avro encoding/decoding

## v1.6.5 3/24/25
- Add capability to return PostgreSQL cursor description

Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "nypl_py_utils"
version = "1.6.5"
version = "1.7.0"
authors = [
{ name="Aaron Friedman", email="aaronfriedman@nypl.org" },
]
Expand All @@ -24,7 +24,7 @@ dependencies = []

[project.optional-dependencies]
avro-client = [
"avro>=1.11.1",
"fastavro>=1.11.1",
"requests>=2.28.1"
]
cloudlibrary-client = [
Expand Down Expand Up @@ -82,7 +82,7 @@ development = [
"flake8>=6.0.0",
"freezegun>=1.2.2",
"mock>=4.0.3",
"pytest>=7.2.0",
"pytest==8.0",
"pytest-mock>=3.10.0",
"requests-mock>=1.10.0"
]
Expand Down
75 changes: 29 additions & 46 deletions src/nypl_py_utils/classes/avro_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import avro.schema
import json
import requests

from avro.errors import AvroException
from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
from fastavro import schemaless_writer, schemaless_reader, parse_schema
from io import BytesIO
from nypl_py_utils.functions.log_helper import create_log
from requests.adapters import HTTPAdapter, Retry
Expand All @@ -23,7 +22,7 @@ def __init__(self, platform_schema_url):
self.session = requests.Session()
self.session.mount("https://",
HTTPAdapter(max_retries=retry_policy))
self.schema = avro.schema.parse(
self.schema = parse_schema(
self.get_json_schema(platform_schema_url))

def get_json_schema(self, platform_schema_url):
Expand Down Expand Up @@ -52,7 +51,7 @@ def get_json_schema(self, platform_schema_url):

try:
json_response = response.json()
return json_response["data"]["schema"]
return json.loads(json_response["data"]["schema"])
except (JSONDecodeError, KeyError) as e:
self.logger.error(
"Retrieved schema is malformed: {errorType} {errorMessage}"
Expand All @@ -70,26 +69,28 @@ class AvroEncoder(AvroClient):
Platform API endpoint from which to fetch the schema in JSON format.
"""

def encode_record(self, record):
def encode_record(self, record, silent=False):
"""
Encodes a single JSON record using the given Avro schema.

Returns the encoded record as a byte string.
"""
self.logger.debug(
"Encoding record using {schema} schema".format(
schema=self.schema.name)
)
datum_writer = DatumWriter(self.schema)
if not silent:
self.logger.info(
"Encoding record using {schema} schema".format(
schema=self.schema['name']
)
)
with BytesIO() as output_stream:
encoder = BinaryEncoder(output_stream)
try:
datum_writer.write(record, encoder)
schemaless_writer(output_stream, self.schema, record,
strict_allow_default=True)
return output_stream.getvalue()
except AvroException as e:
except Exception as e:
self.logger.error("Failed to encode record: {}".format(e))
raise AvroClientError(
"Failed to encode record: {}".format(e)) from None
"Failed to encode record: {}".format(e)
) from None

def encode_batch(self, record_list):
"""
Expand All @@ -99,25 +100,11 @@ def encode_batch(self, record_list):
"""
self.logger.info(
"Encoding ({num_rec}) records using {schema} schema".format(
num_rec=len(record_list), schema=self.schema.name
num_rec=len(record_list), schema=self.schema['name']
)
)
encoded_records = []
datum_writer = DatumWriter(self.schema)
with BytesIO() as output_stream:
encoder = BinaryEncoder(output_stream)
for record in record_list:
try:
datum_writer.write(record, encoder)
encoded_records.append(output_stream.getvalue())
output_stream.seek(0)
output_stream.truncate(0)
except AvroException as e:
self.logger.error("Failed to encode record: {}".format(e))
raise AvroClientError(
"Failed to encode record: {}".format(e)
) from None
return encoded_records
return [self.encode_record(record, silent=True)
for record in record_list]


class AvroDecoder(AvroClient):
Expand All @@ -126,23 +113,22 @@ class AvroDecoder(AvroClient):
Platform API endpoint from which to fetch the schema in JSON format.
"""

def decode_record(self, record):
def decode_record(self, record, silent=False):
"""
Decodes a single record represented using the given Avro
schema. Input must be a bytes-like object.

Returns a dictionary where each key is a field in the schema.
"""
self.logger.debug(
"Decoding {rec} using {schema} schema".format(
rec=record, schema=self.schema.name
if not silent:
self.logger.info(
"Decoding record using {schema} schema".format(
schema=self.schema['name']
)
)
)
datum_reader = DatumReader(self.schema)
with BytesIO(record) as input_stream:
decoder = BinaryDecoder(input_stream)
try:
return datum_reader.read(decoder)
return schemaless_reader(input_stream, self.schema)
except Exception as e:
self.logger.error("Failed to decode record: {}".format(e))
raise AvroClientError(
Expand All @@ -157,14 +143,11 @@ def decode_batch(self, record_list):
"""
self.logger.info(
"Decoding ({num_rec}) records using {schema} schema".format(
num_rec=len(record_list), schema=self.schema.name
num_rec=len(record_list), schema=self.schema['name']
)
)
decoded_records = []
for record in record_list:
decoded_record = self.decode_record(record)
decoded_records.append(decoded_record)
return decoded_records
return [self.decode_record(record, silent=True)
for record in record_list]


class AvroClientError(Exception):
Expand Down
43 changes: 39 additions & 4 deletions tests/test_avro_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,44 @@
]
})}}

FASTAVRO_SCHEMA = {
"type": "record",
"name": "TestSchema",
"fields": [
{
"name": "patron_id",
"type": "int"
},
{
"name": "library_branch",
"type": [
"null",
"string"
]
}
],
"__fastavro_parsed": True,
"__named_schemas": {
"TestSchema": {
"type": "record",
"name": "TestSchema",
"fields": [
{
"name": "patron_id",
"type": "int"
},
{
"name": "library_branch",
"type": [
"null",
"string"
]
}
]
}
}
}


class TestAvroClient:
@pytest.fixture
Expand All @@ -36,10 +74,7 @@ def test_avro_decoder_instance(self, requests_mock):

def test_get_json_schema_success(self, test_avro_encoder_instance,
test_avro_decoder_instance):
assert test_avro_encoder_instance.schema == _TEST_SCHEMA["data"][
"schema"]
assert test_avro_decoder_instance.schema == _TEST_SCHEMA["data"][
"schema"]
assert test_avro_encoder_instance.schema == FASTAVRO_SCHEMA

def test_get_json_schema_error(self, requests_mock):
requests_mock.get("https://test_schema_url", exc=ConnectTimeout)
Expand Down