Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
typing-extensions; python_version < "3.11"
12 changes: 12 additions & 0 deletions src/confluent_kafka/aio/_AIOConsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
import concurrent.futures
from typing import Any, Callable, Dict, Optional, Tuple

try:
from typing import Self
except ImportError:
# FIXME: remove once we depend on Python >= 3.11
from typing_extensions import Self

import confluent_kafka

from . import _common as _common
Expand Down Expand Up @@ -46,6 +52,12 @@ def __init__(

self._consumer: confluent_kafka.Consumer = confluent_kafka.Consumer(consumer_conf)

async def __aenter__(self) -> Self:
return self

async def __aexit__(self, *_) -> None:
await self.close()

async def _call(self, blocking_task: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
return await _common.async_call(self.executor, blocking_task, *args, **kwargs)

Expand Down
12 changes: 12 additions & 0 deletions src/confluent_kafka/aio/producer/_AIOProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
import logging
from typing import Any, Callable, Dict, Optional

try:
from typing import Self
except ImportError:
# FIXME: remove once we depend on Python >= 3.11
from typing_extensions import Self

import confluent_kafka

from .. import _common as _common
Expand Down Expand Up @@ -70,6 +76,12 @@ def __init__(
if buffer_timeout > 0:
self._buffer_timeout_manager.start_timeout_monitoring()

async def __aenter__(self) -> Self:
return self

async def __aexit__(self, *_) -> None:
await self.close()

async def close(self) -> None:
"""Close the producer and cleanup resources

Expand Down
12 changes: 12 additions & 0 deletions tests/test_AIOProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ async def test_close_method(self, mock_producer, mock_common, basic_config):
await producer2.close()
assert producer2._is_closed is True

@pytest.mark.asyncio
async def test_async_context_manager(self, mock_producer, mock_common, basic_config):
with AIOProducer(basic_config) as producer:
assert producer._is_closed is False
assert producer._is_closed is True

with AIOProducer(basic_config) as producer2:
assert producer2._is_closed is False
await producer2.close()
await producer2.close()
assert producer2._is_closed is True

@pytest.mark.asyncio
async def test_call_method_executor_usage(self, mock_producer, mock_common, basic_config):
producer = AIOProducer(basic_config)
Expand Down