Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"require": {
"php": ">=8.1",
"monolog/monolog": "^3.0",
"psr/cache": "^3.0",
"aws/aws-sdk-php": "^3.2"
},
"require-dev": {
Expand Down
156 changes: 125 additions & 31 deletions src/Handler/CloudWatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@
namespace PhpNexus\Cwh\Handler;

use Aws\CloudWatchLogs\CloudWatchLogsClient;
use Aws\CloudWatchLogs\Exception\CloudWatchLogsException;
use DateTime;
use DateTimeImmutable;
use Exception;
use InvalidArgumentException;
use Monolog\Formatter\FormatterInterface;
use Monolog\Formatter\LineFormatter;
use Monolog\Handler\AbstractProcessingHandler;
use Monolog\LogRecord;
use Monolog\Level;
use Monolog\LogRecord;
use Psr\Cache\CacheItemPoolInterface;

class CloudWatch extends AbstractProcessingHandler
{
Expand All @@ -32,7 +38,7 @@ class CloudWatch extends AbstractProcessingHandler

private readonly string $stream;

private readonly int | null $retention;
private readonly int|null $retention;

private readonly int $batchSize;

Expand All @@ -56,9 +62,13 @@ class CloudWatch extends AbstractProcessingHandler

private int $remainingRequests;

private readonly \DateTime $rpsTimestamp;
private readonly DateTime $rpsTimestamp;

private int|null $earliestTimestamp = null;

private int | null $earliestTimestamp = null;
private ?CacheItemPoolInterface $cacheItemPool = null;

private int $cacheItemTtl = 60 * 5;

/**
* CloudWatchLogs constructor.
Expand All @@ -73,41 +83,48 @@ class CloudWatch extends AbstractProcessingHandler
* The ':' (colon) and '*' (asterisk) characters are not allowed.
*
* @param CloudWatchLogsClient $client AWS SDK CloudWatchLogs client to use with this handler.
* @param string $group Name of log group.
* @param string $stream Name of log stream within log group.
* @param string $group Name of the log group.
* @param string $stream Name of the log stream within the log group.
* @param int|null $retention (Optional) Number of days to retain log entries.
* Only used when CloudWatch handler creates log group.
* Only used when CloudWatch handler creates a log group.
* @param int $batchSize (Optional) Number of logs to queue before sending to CloudWatch.
* @param array $tags (Optional) Tags to apply to log group. Only used when CloudWatch handler creates log group.
* @param int|string|\Monolog\Level $level (Optional) The minimum logging level at which this handler will be
* @param array $tags (Optional) Tags to apply to the log group.
* Only used when CloudWatch handler creates a log group.
* @param int|string|Level $level (Optional) The minimum logging level at which this handler will be
* triggered.
* @param bool $bubble (Optional) Whether the messages that are handled can bubble up the stack or not.
* @param bool $createGroup (Optional) Whether to create log group if log group does not exist.
* @param bool $createStream (Optional) Whether to create log stream if log stream does not exist in log group.
* @param int $rpsLimit (Optional) Number of requests per second before a 1 second sleep is triggered.
* @param bool $createGroup (Optional) Whether to create the log group if the log group does not exist.
* @param bool $createStream (Optional) Whether to create log stream if log stream does not exist in the log group.
* @param int $rpsLimit (Optional) Number of requests per second before a 1-second sleep is triggered.
* Set to 0 to disable.
* @throws \Exception
* @param CacheItemPoolInterface|null $cacheItemPool (Optional) PSR-6 cache pool to use for the caching log group
* and stream creation.
* @param int $cacheItemTtl (Optional) TTL for cache items in seconds.
*
* @throws Exception
*/
public function __construct(
CloudWatchLogsClient $client,
string $group,
string $stream,
int | null $retention = 14,
int|null $retention = 14,
int $batchSize = 10000,
array $tags = [],
int | string | Level $level = Level::Debug,
int|string|Level $level = Level::Debug,
bool $bubble = true,
bool $createGroup = true,
bool $createStream = true,
int $rpsLimit = 0
int $rpsLimit = 0,
?CacheItemPoolInterface $cacheItemPool = null,
int $cacheItemTtl = 60 * 5
) {
// Assert batch size is not above 10,000
if ($batchSize > 10000) {
throw new \InvalidArgumentException('Batch size can not be greater than 10000');
throw new InvalidArgumentException('Batch size can not be greater than 10000');
}
// Assert RPS limit is not a negative number
if ($rpsLimit < 0) {
throw new \InvalidArgumentException('RPS limit can not be a negative number');
throw new InvalidArgumentException('RPS limit can not be a negative number');
}

$this->client = $client;
Expand All @@ -120,13 +137,23 @@ public function __construct(
$this->createStream = $createStream;
$this->rpsLimit = $rpsLimit;

if (!$createGroup && !$createStream && $cacheItemPool) {
throw new InvalidArgumentException('Cache pool can not be used without creating log group or stream');
}

$this->cacheItemPool = $cacheItemPool;
$this->cacheItemTtl = $cacheItemTtl;

parent::__construct($level, $bubble);

// Initalize remaining requests and RPS timestamp for rate limiting
// Initialize remaining requests and RPS timestamp for rate limiting
$this->resetRemainingRequests();
$this->rpsTimestamp = new \DateTime();
$this->rpsTimestamp = new DateTime();
}

/**
* @throws \Psr\Cache\InvalidArgumentException
*/
protected function write(LogRecord $record): void
{
$records = $this->formatRecords($record);
Expand Down Expand Up @@ -157,6 +184,9 @@ private function addToBuffer(array $record): void
$this->buffer[] = $record;
}

/**
* @throws \Psr\Cache\InvalidArgumentException
*/
private function flushBuffer(): void
{
if (!empty($this->buffer)) {
Expand All @@ -167,10 +197,10 @@ private function flushBuffer(): void
try {
// send items
$this->send($this->buffer);
} catch (\Aws\CloudWatchLogs\Exception\CloudWatchLogsException $e) {
} catch (CloudWatchLogsException $e) {
error_log('AWS CloudWatchLogs threw an exception while sending items: ' . $e->getMessage());

// wait for 1 second and try to send items again (in case of per account per region rate limiting)
// wait for 1 second and try to send items again (in the case of per account per region rate limiting)
sleep(1);
$this->send($this->buffer);
}
Expand All @@ -189,12 +219,12 @@ private function flushBuffer(): void
private function checkThrottle(): void
{
if ($this->rpsLimit > 0) {
// Calculate number of seconds between now and last RPS timestamp
$diff = $this->rpsTimestamp->diff(new \DateTimeImmutable())->s;
// Calculate the number of seconds between now and the last RPS timestamp
$diff = $this->rpsTimestamp->diff(new DateTimeImmutable())->s;
$sameSecond = $diff === 0;

if ($sameSecond) {
// If no remaining requests for current second
// If no remaining requests for the current second
if ($this->remainingRequests === 0) {
// Sleep and reset remaining requests
sleep(1);
Expand Down Expand Up @@ -236,7 +266,7 @@ protected function willMessageSizeExceedLimit(array $record): bool
}

/**
* Determine whether the specified record's timestamp exceeds the 24 hour timespan limit
* Determine whether the specified record's timestamp exceeds the 24-hour timespan limit
* for all batched messages written in a single call to PutLogEvents.
*/
protected function willMessageTimestampExceedLimit(array $record): bool
Expand All @@ -245,7 +275,7 @@ protected function willMessageTimestampExceedLimit(array $record): bool
}

/**
* Event size in the batch can not be bigger than 1 MB
* Event size in the batch cannot be bigger than 1 MB
* https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
*/
private function formatRecords(LogRecord $entry): array
Expand All @@ -270,14 +300,14 @@ private function formatRecords(LogRecord $entry): array
* UTF-8, plus 26 bytes for each log event.
* - None of the log events in the batch can be more than 2 hours in the future.
* - None of the log events in the batch can be older than 14 days or the retention period of the log group.
* - The log events in the batch must be in chronological ordered by their timestamp (the time the event occurred,
* expressed as the number of milliseconds since Jan 1, 1970 00:00:00 UTC).
* - The log events in the batch must be in chronologically ordered by their timestamp
* (the time the event occurred, expressed as the number of milliseconds since Jan 1, 1970 00:00:00 UTC).
* - The maximum number of log events in a batch is 10,000.
* - A batch of log events in a single request cannot span more than 24 hours. Otherwise, the operation fails.
*
* @param LogRecord[] $entries
*
* @throws \Aws\CloudWatchLogs\Exception\CloudWatchLogsException Thrown by putLogEvents()
* @throws CloudWatchLogsException Thrown by putLogEvents()
*/
private function send(array $entries): void
{
Expand All @@ -297,8 +327,22 @@ private function send(array $entries): void
$this->client->putLogEvents($data);
}

/**
* @throws \Psr\Cache\InvalidArgumentException
*/
private function initializeGroup(): void
{
// Check if a PSR-6 cache pool is available
if ($this->cacheItemPool !== null) {
// Attempt to retrieve the cached state for the current log group
$cacheItem = $this->cacheItemPool->getItem($this->group);

// If the group is already cached, skip further initialization
if ($cacheItem->isHit()) {
return;
}
}

// fetch existing groups
$existingGroups = $this
->client
Expand Down Expand Up @@ -331,10 +375,39 @@ private function initializeGroup(): void
);
}
}

// Check if a cache pool is configured
if ($this->cacheItemPool !== null) {
// Retrieve or create a cache item for the current log group
$cacheItem = $this->cacheItemPool->getItem($this->group);

// Mark the log group as initialized/existing
$cacheItem->set(true);

// Set the expiration time for this cache entry
$cacheItem->expiresAfter($this->cacheItemTtl);

// Persist the item to the cache to avoid redundant initialization checks
$this->cacheItemPool->save($cacheItem);
}
}

/**
* @throws \Psr\Cache\InvalidArgumentException
*/
private function initializeStream(): void
{
// Check if a PSR-6 cache pool is configured
if ($this->cacheItemPool !== null) {
// Attempt to retrieve the cached state for the current log stream
$cacheItem = $this->cacheItemPool->getItem($this->stream);

// If the stream is already known to exist in cache, skip initialization
if ($cacheItem->isHit()) {
return;
}
}

// fetch existing streams
$existingStreams = $this
->client
Expand All @@ -349,7 +422,7 @@ private function initializeStream(): void
// extract existing streams names
$existingStreamsNames = array_column($existingStreams, 'logStreamName');

// create stream if not created
// create a stream if not created
if (!in_array($this->stream, $existingStreamsNames, true)) {
$this
->client
Expand All @@ -360,8 +433,26 @@ private function initializeStream(): void
]
);
}

// If a cache pool is available, mark the log stream as initialized
if ($this->cacheItemPool !== null) {
// Create/retrieve a cache item using the stream name as the key
$cacheItem = $this->cacheItemPool->getItem($this->stream);

// Set value to true to indicate the stream exists
$cacheItem->set(true);

// Set the expiration time based on configured TTL
$cacheItem->expiresAfter($this->cacheItemTtl);

// Persist the item to the cache pool
$this->cacheItemPool->save($cacheItem);
}
}

/**
* @throws \Psr\Cache\InvalidArgumentException
*/
private function initialize(): void
{
if ($this->createGroup) {
Expand All @@ -379,6 +470,9 @@ protected function getDefaultFormatter(): FormatterInterface
return new LineFormatter("%channel%: %level_name%: %message% %context% %extra%", null, false, true);
}

/**
* @throws \Psr\Cache\InvalidArgumentException
*/
public function close(): void
{
$this->flushBuffer();
Expand Down
Loading