diff --git a/composer.json b/composer.json index 8fc0dc8..a357757 100644 --- a/composer.json +++ b/composer.json @@ -30,6 +30,7 @@ "require": { "php": ">=8.1", "monolog/monolog": "^3.0", + "psr/cache": "^3.0", "aws/aws-sdk-php": "^3.2" }, "require-dev": { diff --git a/src/Handler/CloudWatch.php b/src/Handler/CloudWatch.php index db9a7b1..aa3d8f1 100755 --- a/src/Handler/CloudWatch.php +++ b/src/Handler/CloudWatch.php @@ -3,11 +3,17 @@ namespace PhpNexus\Cwh\Handler; use Aws\CloudWatchLogs\CloudWatchLogsClient; +use Aws\CloudWatchLogs\Exception\CloudWatchLogsException; +use DateTime; +use DateTimeImmutable; +use Exception; +use InvalidArgumentException; use Monolog\Formatter\FormatterInterface; use Monolog\Formatter\LineFormatter; use Monolog\Handler\AbstractProcessingHandler; -use Monolog\LogRecord; use Monolog\Level; +use Monolog\LogRecord; +use Psr\Cache\CacheItemPoolInterface; class CloudWatch extends AbstractProcessingHandler { @@ -32,7 +38,7 @@ class CloudWatch extends AbstractProcessingHandler private readonly string $stream; - private readonly int | null $retention; + private readonly int|null $retention; private readonly int $batchSize; @@ -56,9 +62,13 @@ class CloudWatch extends AbstractProcessingHandler private int $remainingRequests; - private readonly \DateTime $rpsTimestamp; + private readonly DateTime $rpsTimestamp; + + private int|null $earliestTimestamp = null; - private int | null $earliestTimestamp = null; + private ?CacheItemPoolInterface $cacheItemPool = null; + + private int $cacheItemTtl = 60 * 5; /** * CloudWatchLogs constructor. @@ -73,41 +83,48 @@ class CloudWatch extends AbstractProcessingHandler * The ':' (colon) and '*' (asterisk) characters are not allowed. * * @param CloudWatchLogsClient $client AWS SDK CloudWatchLogs client to use with this handler. - * @param string $group Name of log group. - * @param string $stream Name of log stream within log group. + * @param string $group Name of the log group. + * @param string $stream Name of the log stream within the log group. * @param int|null $retention (Optional) Number of days to retain log entries. - * Only used when CloudWatch handler creates log group. + * Only used when CloudWatch handler creates a log group. * @param int $batchSize (Optional) Number of logs to queue before sending to CloudWatch. - * @param array $tags (Optional) Tags to apply to log group. Only used when CloudWatch handler creates log group. - * @param int|string|\Monolog\Level $level (Optional) The minimum logging level at which this handler will be + * @param array $tags (Optional) Tags to apply to the log group. + * Only used when CloudWatch handler creates a log group. + * @param int|string|Level $level (Optional) The minimum logging level at which this handler will be * triggered. * @param bool $bubble (Optional) Whether the messages that are handled can bubble up the stack or not. - * @param bool $createGroup (Optional) Whether to create log group if log group does not exist. - * @param bool $createStream (Optional) Whether to create log stream if log stream does not exist in log group. - * @param int $rpsLimit (Optional) Number of requests per second before a 1 second sleep is triggered. + * @param bool $createGroup (Optional) Whether to create the log group if the log group does not exist. + * @param bool $createStream (Optional) Whether to create log stream if log stream does not exist in the log group. + * @param int $rpsLimit (Optional) Number of requests per second before a 1-second sleep is triggered. * Set to 0 to disable. - * @throws \Exception + * @param CacheItemPoolInterface|null $cacheItemPool (Optional) PSR-6 cache pool to use for the caching log group + * and stream creation. + * @param int $cacheItemTtl (Optional) TTL for cache items in seconds. + * + * @throws Exception */ public function __construct( CloudWatchLogsClient $client, string $group, string $stream, - int | null $retention = 14, + int|null $retention = 14, int $batchSize = 10000, array $tags = [], - int | string | Level $level = Level::Debug, + int|string|Level $level = Level::Debug, bool $bubble = true, bool $createGroup = true, bool $createStream = true, - int $rpsLimit = 0 + int $rpsLimit = 0, + ?CacheItemPoolInterface $cacheItemPool = null, + int $cacheItemTtl = 60 * 5 ) { // Assert batch size is not above 10,000 if ($batchSize > 10000) { - throw new \InvalidArgumentException('Batch size can not be greater than 10000'); + throw new InvalidArgumentException('Batch size can not be greater than 10000'); } // Assert RPS limit is not a negative number if ($rpsLimit < 0) { - throw new \InvalidArgumentException('RPS limit can not be a negative number'); + throw new InvalidArgumentException('RPS limit can not be a negative number'); } $this->client = $client; @@ -120,13 +137,23 @@ public function __construct( $this->createStream = $createStream; $this->rpsLimit = $rpsLimit; + if (!$createGroup && !$createStream && $cacheItemPool) { + throw new InvalidArgumentException('Cache pool can not be used without creating log group or stream'); + } + + $this->cacheItemPool = $cacheItemPool; + $this->cacheItemTtl = $cacheItemTtl; + parent::__construct($level, $bubble); - // Initalize remaining requests and RPS timestamp for rate limiting + // Initialize remaining requests and RPS timestamp for rate limiting $this->resetRemainingRequests(); - $this->rpsTimestamp = new \DateTime(); + $this->rpsTimestamp = new DateTime(); } + /** + * @throws \Psr\Cache\InvalidArgumentException + */ protected function write(LogRecord $record): void { $records = $this->formatRecords($record); @@ -157,6 +184,9 @@ private function addToBuffer(array $record): void $this->buffer[] = $record; } + /** + * @throws \Psr\Cache\InvalidArgumentException + */ private function flushBuffer(): void { if (!empty($this->buffer)) { @@ -167,10 +197,10 @@ private function flushBuffer(): void try { // send items $this->send($this->buffer); - } catch (\Aws\CloudWatchLogs\Exception\CloudWatchLogsException $e) { + } catch (CloudWatchLogsException $e) { error_log('AWS CloudWatchLogs threw an exception while sending items: ' . $e->getMessage()); - // wait for 1 second and try to send items again (in case of per account per region rate limiting) + // wait for 1 second and try to send items again (in the case of per account per region rate limiting) sleep(1); $this->send($this->buffer); } @@ -189,12 +219,12 @@ private function flushBuffer(): void private function checkThrottle(): void { if ($this->rpsLimit > 0) { - // Calculate number of seconds between now and last RPS timestamp - $diff = $this->rpsTimestamp->diff(new \DateTimeImmutable())->s; + // Calculate the number of seconds between now and the last RPS timestamp + $diff = $this->rpsTimestamp->diff(new DateTimeImmutable())->s; $sameSecond = $diff === 0; if ($sameSecond) { - // If no remaining requests for current second + // If no remaining requests for the current second if ($this->remainingRequests === 0) { // Sleep and reset remaining requests sleep(1); @@ -236,7 +266,7 @@ protected function willMessageSizeExceedLimit(array $record): bool } /** - * Determine whether the specified record's timestamp exceeds the 24 hour timespan limit + * Determine whether the specified record's timestamp exceeds the 24-hour timespan limit * for all batched messages written in a single call to PutLogEvents. */ protected function willMessageTimestampExceedLimit(array $record): bool @@ -245,7 +275,7 @@ protected function willMessageTimestampExceedLimit(array $record): bool } /** - * Event size in the batch can not be bigger than 1 MB + * Event size in the batch cannot be bigger than 1 MB * https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html */ private function formatRecords(LogRecord $entry): array @@ -270,14 +300,14 @@ private function formatRecords(LogRecord $entry): array * UTF-8, plus 26 bytes for each log event. * - None of the log events in the batch can be more than 2 hours in the future. * - None of the log events in the batch can be older than 14 days or the retention period of the log group. - * - The log events in the batch must be in chronological ordered by their timestamp (the time the event occurred, - * expressed as the number of milliseconds since Jan 1, 1970 00:00:00 UTC). + * - The log events in the batch must be in chronologically ordered by their timestamp + * (the time the event occurred, expressed as the number of milliseconds since Jan 1, 1970 00:00:00 UTC). * - The maximum number of log events in a batch is 10,000. * - A batch of log events in a single request cannot span more than 24 hours. Otherwise, the operation fails. * * @param LogRecord[] $entries * - * @throws \Aws\CloudWatchLogs\Exception\CloudWatchLogsException Thrown by putLogEvents() + * @throws CloudWatchLogsException Thrown by putLogEvents() */ private function send(array $entries): void { @@ -297,8 +327,22 @@ private function send(array $entries): void $this->client->putLogEvents($data); } + /** + * @throws \Psr\Cache\InvalidArgumentException + */ private function initializeGroup(): void { + // Check if a PSR-6 cache pool is available + if ($this->cacheItemPool !== null) { + // Attempt to retrieve the cached state for the current log group + $cacheItem = $this->cacheItemPool->getItem($this->group); + + // If the group is already cached, skip further initialization + if ($cacheItem->isHit()) { + return; + } + } + // fetch existing groups $existingGroups = $this ->client @@ -331,10 +375,39 @@ private function initializeGroup(): void ); } } + + // Check if a cache pool is configured + if ($this->cacheItemPool !== null) { + // Retrieve or create a cache item for the current log group + $cacheItem = $this->cacheItemPool->getItem($this->group); + + // Mark the log group as initialized/existing + $cacheItem->set(true); + + // Set the expiration time for this cache entry + $cacheItem->expiresAfter($this->cacheItemTtl); + + // Persist the item to the cache to avoid redundant initialization checks + $this->cacheItemPool->save($cacheItem); + } } + /** + * @throws \Psr\Cache\InvalidArgumentException + */ private function initializeStream(): void { + // Check if a PSR-6 cache pool is configured + if ($this->cacheItemPool !== null) { + // Attempt to retrieve the cached state for the current log stream + $cacheItem = $this->cacheItemPool->getItem($this->stream); + + // If the stream is already known to exist in cache, skip initialization + if ($cacheItem->isHit()) { + return; + } + } + // fetch existing streams $existingStreams = $this ->client @@ -349,7 +422,7 @@ private function initializeStream(): void // extract existing streams names $existingStreamsNames = array_column($existingStreams, 'logStreamName'); - // create stream if not created + // create a stream if not created if (!in_array($this->stream, $existingStreamsNames, true)) { $this ->client @@ -360,8 +433,26 @@ private function initializeStream(): void ] ); } + + // If a cache pool is available, mark the log stream as initialized + if ($this->cacheItemPool !== null) { + // Create/retrieve a cache item using the stream name as the key + $cacheItem = $this->cacheItemPool->getItem($this->stream); + + // Set value to true to indicate the stream exists + $cacheItem->set(true); + + // Set the expiration time based on configured TTL + $cacheItem->expiresAfter($this->cacheItemTtl); + + // Persist the item to the cache pool + $this->cacheItemPool->save($cacheItem); + } } + /** + * @throws \Psr\Cache\InvalidArgumentException + */ private function initialize(): void { if ($this->createGroup) { @@ -379,6 +470,9 @@ protected function getDefaultFormatter(): FormatterInterface return new LineFormatter("%channel%: %level_name%: %message% %context% %extra%", null, false, true); } + /** + * @throws \Psr\Cache\InvalidArgumentException + */ public function close(): void { $this->flushBuffer(); diff --git a/tests/Handler/CloudWatchTest.php b/tests/PhpNexus/Cwh/Test/Handler/CloudWatchTest.php similarity index 73% rename from tests/Handler/CloudWatchTest.php rename to tests/PhpNexus/Cwh/Test/Handler/CloudWatchTest.php index 4ea1a8b..cb87ada 100644 --- a/tests/Handler/CloudWatchTest.php +++ b/tests/PhpNexus/Cwh/Test/Handler/CloudWatchTest.php @@ -5,17 +5,24 @@ use Aws\CloudWatchLogs\CloudWatchLogsClient; use Aws\CloudWatchLogs\Exception\CloudWatchLogsException; use Aws\Result; -use PhpNexus\Cwh\Handler\CloudWatch; +use DateTimeImmutable; +use Exception; +use InvalidArgumentException; use Monolog\Formatter\LineFormatter; -use Monolog\LogRecord; use Monolog\Level; -use PHPUnit\Framework\TestCase; +use Monolog\LogRecord; +use PhpNexus\Cwh\Handler\CloudWatch; use PHPUnit\Framework\MockObject\MockObject; +use PHPUnit\Framework\TestCase; +use Psr\Cache\CacheItemInterface; +use Psr\Cache\CacheItemPoolInterface; +use ReflectionClass; +use ReflectionException; class CloudWatchTest extends TestCase { - private MockObject | CloudWatchLogsClient $clientMock; - private MockObject | Result $awsResultMock; + private MockObject|CloudWatchLogsClient $clientMock; + private MockObject|Result $awsResultMock; private string $groupName = 'group'; private string $streamName = 'stream'; @@ -37,6 +44,10 @@ protected function setUp(): void ->getMock(); } + /** + * @throws ReflectionException + * @throws Exception + */ public function testInitializeWithCreateGroupDisabled(): void { $this @@ -79,12 +90,15 @@ public function testInitializeWithCreateGroupDisabled(): void false ); - $reflection = new \ReflectionClass($handler); + $reflection = new ReflectionClass($handler); $reflectionMethod = $reflection->getMethod('initialize'); - $reflectionMethod->setAccessible(true); $reflectionMethod->invoke($handler); } + /** + * @throws ReflectionException + * @throws Exception + */ public function testInitializeWithCreateStreamDisabled(): void { $this @@ -115,12 +129,15 @@ public function testInitializeWithCreateStreamDisabled(): void false ); - $reflection = new \ReflectionClass($handler); + $reflection = new ReflectionClass($handler); $reflectionMethod = $reflection->getMethod('initialize'); - $reflectionMethod->setAccessible(true); $reflectionMethod->invoke($handler); } + /** + * @throws ReflectionException + * @throws Exception + */ public function testInitializeWithExistingLogGroup(): void { $logGroupsResult = new Result(['logGroups' => [['logGroupName' => $this->groupName]]]); @@ -152,12 +169,114 @@ public function testInitializeWithExistingLogGroup(): void $handler = $this->getCUT(); - $reflection = new \ReflectionClass($handler); + $reflection = new ReflectionClass($handler); + $reflectionMethod = $reflection->getMethod('initialize'); + $reflectionMethod->invoke($handler); + } + + /** + * @throws Exception + */ + public function testInvalidCacheConfiguration(): void + { + $cacheMock = $this->createMock(CacheItemPoolInterface::class); + + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Cache pool can not be used without creating log group or stream'); + + new CloudWatch( + $this->clientMock, + $this->groupName, + $this->streamName, + createGroup: false, + createStream: false, + cacheItemPool: $cacheMock + ); + } + + /** + * @throws ReflectionException + * @throws Exception + */ + public function testInitializeWithCacheHits(): void + { + $cacheItemMock = $this->createMock(CacheItemInterface::class); + $cacheItemMock->method('isHit')->willReturn(true); + + $cachePoolMock = $this->createMock(CacheItemPoolInterface::class); + + $matcher = $this->exactly(2); + $expected1 = $this->groupName; + $expected2 = $this->streamName; + + $cachePoolMock + ->expects($matcher) + ->method('getItem') + ->willReturnCallback( + function (string $key) use ($cacheItemMock, $matcher, $expected1, $expected2) { + match ($matcher->getInvocationCount()) { + 1 => $this->assertEquals($expected1, $key), + 2 => $this->assertEquals($expected2, $key), + }; + return $cacheItemMock; + } + ); + + // AWS methods should never be called if cache hits + $this->clientMock->expects($this->never())->method('describeLogGroups'); + $this->clientMock->expects($this->never())->method('describeLogStreams'); + + $handler = new CloudWatch( + $this->clientMock, + $this->groupName, + $this->streamName, + cacheItemPool: $cachePoolMock + ); + + $reflection = new ReflectionClass($handler); + $reflectionMethod = $reflection->getMethod('initialize'); + $reflectionMethod->invoke($handler); + } + + /** + * @throws ReflectionException + * @throws Exception + */ + public function testInitializeWithCacheMisses(): void + { + $cacheItemMock = $this->createMock(CacheItemInterface::class); + $cacheItemMock->method('isHit')->willReturn(false); + $cacheItemMock->expects($this->exactly(2))->method('set')->with(true); + $cacheItemMock->expects($this->exactly(2))->method('expiresAfter')->with(300); + + $cachePoolMock = $this->createMock(CacheItemPoolInterface::class); + $cachePoolMock->method('getItem')->willReturn($cacheItemMock); + $cachePoolMock->expects($this->exactly(2))->method('save')->with($cacheItemMock); + + // Mock AWS responses for initialization + $logGroupsResult = new Result(['logGroups' => [['logGroupName' => $this->groupName]]]); + $this->clientMock->method('describeLogGroups')->willReturn($logGroupsResult); + + $logStreamResult = new Result(['logStreams' => [['logStreamName' => $this->streamName]]]); + $this->clientMock->method('describeLogStreams')->willReturn($logStreamResult); + + $handler = new CloudWatch( + $this->clientMock, + $this->groupName, + $this->streamName, + cacheItemPool: $cachePoolMock, + cacheItemTtl: 300 + ); + + $reflection = new ReflectionClass($handler); $reflectionMethod = $reflection->getMethod('initialize'); - $reflectionMethod->setAccessible(true); $reflectionMethod->invoke($handler); } + /** + * @throws ReflectionException + * @throws Exception + */ public function testInitializeWithTags(): void { $tags = [ @@ -203,12 +322,15 @@ public function testInitializeWithTags(): void $handler = new CloudWatch($this->clientMock, $this->groupName, $this->streamName, 14, 10000, $tags); - $reflection = new \ReflectionClass($handler); + $reflection = new ReflectionClass($handler); $reflectionMethod = $reflection->getMethod('initialize'); - $reflectionMethod->setAccessible(true); $reflectionMethod->invoke($handler); } + /** + * @throws ReflectionException + * @throws Exception + */ public function testInitializeWithEmptyTags(): void { $logGroupsResult = new Result(['logGroups' => [['logGroupName' => $this->groupName . 'foo']]]); @@ -246,12 +368,15 @@ public function testInitializeWithEmptyTags(): void $handler = new CloudWatch($this->clientMock, $this->groupName, $this->streamName); - $reflection = new \ReflectionClass($handler); + $reflection = new ReflectionClass($handler); $reflectionMethod = $reflection->getMethod('initialize'); - $reflectionMethod->setAccessible(true); $reflectionMethod->invoke($handler); } + /** + * @throws ReflectionException + * @throws Exception + */ public function testInitializeWithMissingGroupAndStream(): void { $logGroupsResult = new Result(['logGroups' => [['logGroupName' => $this->groupName . 'foo']]]); @@ -307,24 +432,33 @@ public function testInitializeWithMissingGroupAndStream(): void $handler = $this->getCUT(); - $reflection = new \ReflectionClass($handler); + $reflection = new ReflectionClass($handler); $reflectionMethod = $reflection->getMethod('initialize'); - $reflectionMethod->setAccessible(true); $reflectionMethod->invoke($handler); } + /** + * @throws Exception + */ public function testBatchSizeLimitExceeded(): void { - $this->expectException(\InvalidArgumentException::class); + $this->expectException(InvalidArgumentException::class); (new CloudWatch($this->clientMock, 'a', 'b', batchSize: 10001)); } + /** + * @throws Exception + */ public function testInvalidRpsLimit(): void { - $this->expectException(\InvalidArgumentException::class); + $this->expectException(InvalidArgumentException::class); (new CloudWatch($this->clientMock, 'a', 'b', rpsLimit: -1)); } + /** + * @throws \Psr\Cache\InvalidArgumentException + * @throws Exception + */ public function testSendsOnClose(): void { $this->prepareMocks(); @@ -342,6 +476,10 @@ public function testSendsOnClose(): void $handler->close(); } + /** + * @throws \Psr\Cache\InvalidArgumentException + * @throws Exception + */ public function testSendsBatches(): void { $this->prepareMocks(); @@ -361,6 +499,9 @@ public function testSendsBatches(): void $handler->close(); } + /** + * @throws Exception + */ public function testSendWithRPSLimit(): void { $this->prepareMocks(); @@ -389,21 +530,20 @@ public function testSendWithRPSLimit(): void 2 ); - // Get access to remainingRequests property - $reflection = new \ReflectionClass($handler); + // Get access to the remainingRequests property + $reflection = new ReflectionClass($handler); $remainingRequestsProperty = $reflection->getProperty('remainingRequests'); - $remainingRequestsProperty->setAccessible(true); // Initial log entry $handler->handle($this->getRecord(Level::Debug, 'record')); - // Ensure remainingRequests was decremented to 1 after initial log entry + // Ensure remainingRequests was decremented to 1 after the initial log entry $this->assertEquals(1, $remainingRequestsProperty->getValue($handler)); - // Second log entry immediately after + // The second log entry immediately after $handler->handle($this->getRecord(Level::Debug, 'record')); - // Ensure remainingRequests was decremented to 0 after second log entry + // Ensure remainingRequests was decremented to 0 after the second log entry $this->assertEquals(0, $remainingRequestsProperty->getValue($handler)); // Third log entry immediately after @@ -422,6 +562,9 @@ public function testSendWithRPSLimit(): void $this->assertEquals(1, $remainingRequestsProperty->getValue($handler)); } + /** + * @throws Exception + */ public function testFormatter(): void { $handler = $this->getCUT(); @@ -433,10 +576,12 @@ public function testFormatter(): void $this->assertEquals($expected, $formatter); } + /** + * @throws Exception + */ public function testExceptionFromDescribeLogGroups(): void { // e.g. 'User is not authorized to perform logs:DescribeLogGroups' - /** @var CloudWatchLogsException */ $awsException = $this->getMockBuilder(CloudWatchLogsException::class) ->disableOriginalConstructor() ->getMock(); @@ -496,6 +641,10 @@ private function prepareMocks(): void ->getMock(); } + /** + * @throws \Psr\Cache\InvalidArgumentException + * @throws Exception + */ public function testSortsEntriesChronologically(): void { $this->prepareMocks(); @@ -519,8 +668,8 @@ public function testSortsEntriesChronologically(): void $records = []; for ($i = 1; $i <= 4; ++$i) { - $dt = \DateTimeImmutable::createFromFormat('U', time() + $i); - $record = $this->getRecord(Level::Info, 'record' . $i, [], $dt); + $dt = DateTimeImmutable::createFromFormat('U', time() + $i); + $record = $this->getRecord(Level::Info, 'record' . $i, $dt); $records[] = $record; } @@ -533,6 +682,10 @@ public function testSortsEntriesChronologically(): void $handler->close(); } + /** + * @throws \Psr\Cache\InvalidArgumentException + * @throws Exception + */ public function testSendsBatchesSpanning24HoursOrLess(): void { $this->prepareMocks(); @@ -542,10 +695,10 @@ public function testSendsBatchesSpanning24HoursOrLess(): void ->expects($this->exactly(3)) ->method('PutLogEvents') ->willReturnCallback(function (array $data) { - /** @var int|null */ + /** @var $earliestTime int|null */ $earliestTime = null; - /** @var int|null */ + /** @var $latestTime int|null */ $latestTime = null; foreach ($data['logEvents'] as $logEvent) { @@ -572,14 +725,17 @@ public function testSendsBatchesSpanning24HoursOrLess(): void // write 15 log entries spanning 3 days for ($i = 1; $i <= 15; ++$i) { - $dt = \DateTimeImmutable::createFromFormat('U', time() + $i * 5 * 60 * 60); - $record = $this->getRecord(Level::Info, 'record' . $i, [], $dt); + $dt = DateTimeImmutable::createFromFormat('U', time() + $i * 5 * 60 * 60); + $record = $this->getRecord(Level::Info, 'record' . $i, $dt); $handler->handle($record); } $handler->close(); } + /** + * @throws Exception + */ private function getCUT(int $batchSize = 1000): CloudWatch { return new CloudWatch($this->clientMock, $this->groupName, $this->streamName, 14, $batchSize); @@ -588,9 +744,9 @@ private function getCUT(int $batchSize = 1000): CloudWatch private function getRecord( Level $level, string $message = 'test', - array $context = [], - \DateTimeImmutable $dt = new \DateTimeImmutable() + DateTimeImmutable $dt = new DateTimeImmutable() ): LogRecord { + $context = []; return new LogRecord( $dt, 'test',