Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ public function onTerminate(ConsoleTerminateEvent $event) : void

private function matchesPattern(string $command, string $pattern) : bool
{
if (\str_starts_with($pattern, '/') && \str_ends_with($pattern, '/')) {
return (bool) \preg_match($pattern, $command);
$result = @\preg_match($pattern, $command);

if ($result !== false) {
return (bool) $result;
}

return $command === $pattern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ private function isTemplateExcluded(string $template) : bool

private function matchesPattern(string $template, string $pattern) : bool
{
if (\str_starts_with($pattern, '/') && \str_ends_with($pattern, '/')) {
return (bool) \preg_match($pattern, $template);
$result = @\preg_match($pattern, $template);

if ($result !== false) {
return (bool) $result;
}

return $template === $pattern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public function test_does_not_trace_when_disabled() : void
]);

$application = new Application($kernel);
$application->add(new TestCommand());
$this->addCommand($application, new TestCommand());
$application->setAutoExit(false);
$application->setCatchExceptions(false);

Expand Down Expand Up @@ -108,7 +108,7 @@ public function test_excludes_command_with_exact_match() : void
]);

$application = new Application($kernel);
$application->add(new TestCommand());
$this->addCommand($application, new TestCommand());
$application->setAutoExit(false);
$application->setCatchExceptions(false);

Expand Down Expand Up @@ -159,8 +159,8 @@ public function test_excludes_command_with_regex_pattern() : void
]);

$application = new Application($kernel);
$application->add(new TestCommand());
$application->add(new FailingCommand());
$this->addCommand($application, new TestCommand());
$this->addCommand($application, new FailingCommand());
$application->setAutoExit(false);
$application->setCatchExceptions(false);

Expand Down Expand Up @@ -211,7 +211,7 @@ public function test_traces_failing_console_command() : void
]);

$application = new Application($kernel);
$application->add(new FailingCommand());
$this->addCommand($application, new FailingCommand());
$application->setAutoExit(false);
$application->setCatchExceptions(false);

Expand Down Expand Up @@ -271,7 +271,7 @@ public function test_traces_successful_console_command() : void
]);

$application = new Application($kernel);
$application->add(new TestCommand());
$this->addCommand($application, new TestCommand());
$application->setAutoExit(false);
$application->setCatchExceptions(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Flow\Bridge\Symfony\TelemetryBundle\Tests\Context\SymfonyContext;
use Flow\Bridge\Symfony\TelemetryBundle\Tests\Fixtures\TestKernel;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\DependencyInjection\ContainerInterface;

abstract class KernelTestCase extends TestCase
Expand All @@ -23,6 +25,16 @@ protected function tearDown() : void
$this->context->shutdown();
}

protected function addCommand(Application $application, Command $command) : void
{
/** @phpstan-ignore function.alreadyNarrowedType */
if (\method_exists($application, 'addCommand')) {
$application->addCommand($command);
} else {
$application->add($command);
}
}

/**
* @param array{config?: callable(TestKernel): void} $options
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ public function rowCount() : int
};
}

public function quote(string $value) : string
/** @phpstan-ignore missingType.parameter, missingType.parameter */
public function quote($value, $type = ParameterType::STRING) : string
{
return "'{$value}'";
}
Expand Down
42 changes: 25 additions & 17 deletions src/core/etl/src/Flow/ETL/Config/Telemetry/TelemetryContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,16 @@ public function dataFrameCompleted(FlowContext $context, array $attributes = [])
return;
}

$this->logger()->debug('Data frame processing completed', [
'dataframe_id' => $context->config->id(),
'total_rows_processed' => $this->totalRowsProcessed,
'memory_min_mb' => $this->memory->min()->inMb(),
'memory_max_mb' => $this->memory->max()->inMb(),
]);
$this->logger()->debug(
'Data frame processing completed',
[
'dataframe_id' => $context->config->id(),
'total_rows_processed' => $this->totalRowsProcessed,
'memory_min_mb' => $this->memory->min()->inMb(),
'memory_max_mb' => $this->memory->max()->inMb(),
],
spanContext: $this->dataFrameSpan->context()
);

$throughput = 0.0;

Expand Down Expand Up @@ -118,18 +122,22 @@ public function dataFrameStarted(FlowContext $context) : void
{
$this->dataFrameSpan = $this->tracer->span(DataFrame::class);

$this->logger()->debug('Data frame processing started', [
'dataframe_id' => $context->config->id(),
'cache' => $context->cache()::class,
'serializer' => $context->config->serializer()::class,
'optimizers' => \array_map(static fn (Optimization $optimization) => $optimization::class, $context->config->optimizer()->optimizations()),
'telemetry' => [
'trace_loading' => $this->options->traceLoading,
'trace_transformations' => $this->options->traceTransformations,
'collect_metrics' => $this->options->collectMetrics,
$this->logger()->debug(
'Data frame processing started',
[
'dataframe_id' => $context->config->id(),
'cache' => $context->cache()::class,
'serializer' => $context->config->serializer()::class,
'optimizers' => \array_map(static fn (Optimization $optimization) => $optimization::class, $context->config->optimizer()->optimizations()),
'telemetry' => [
'trace_loading' => $this->options->traceLoading,
'trace_transformations' => $this->options->traceTransformations,
'collect_metrics' => $this->options->collectMetrics,
],
'fstab' => \array_map(static fn (Filesystem $filesystem) => $filesystem::class, $context->config->fstab()->filesystems()),
],
'fstab' => \array_map(static fn (Filesystem $filesystem) => $filesystem::class, $context->config->fstab()->filesystems()),
]);
spanContext: $this->dataFrameSpan->context()
);

if ($this->options->collectMetrics) {
$this->counterProcessedRows = $this->meter->createCounter('rows.processed.total', 'Rows Processed');
Expand Down
145 changes: 124 additions & 21 deletions src/lib/telemetry/src/Flow/Telemetry/Logger/Logger.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,27 @@ public function __construct(
*
* @param string $body The log message
* @param Attributes|TAttributeValueMap $attributes Optional attributes
* @param null|\DateTimeImmutable $timestamp When the event occurred (defaults to current time)
* @param null|\DateTimeImmutable $observedTimestamp When the log was observed by collection
* @param null|SpanContext $spanContext Span context for trace correlation (defaults to current active span)
*/
public function debug(string $body, array|Attributes $attributes = []) : void
{
$this->emit(new LogRecord(Severity::DEBUG, $body, $attributes instanceof Attributes ? $attributes : Attributes::create($attributes)));
public function debug(
string $body,
array|Attributes $attributes = [],
?\DateTimeImmutable $timestamp = null,
?\DateTimeImmutable $observedTimestamp = null,
?SpanContext $spanContext = null,
) : void {
$this->emit(
new LogRecord(
Severity::DEBUG,
$body,
$attributes instanceof Attributes ? $attributes : Attributes::create($attributes),
$timestamp,
$observedTimestamp,
),
$spanContext,
);
}

/**
Expand All @@ -62,15 +79,16 @@ public function debug(string $body, array|Attributes $attributes = []) : void
* such as setting a custom timestamp or recording an exception.
*
* @param LogRecord $record The log record to emit
* @param null|SpanContext $spanContext Span context for trace correlation (defaults to current active span)
*/
public function emit(LogRecord $record) : void
public function emit(LogRecord $record, ?SpanContext $spanContext = null) : void
{
$entry = new LogEntry(
$record,
$this->resource,
$this->scope,
$record->timestamp ?? $this->clock->now(),
$this->resolveSpanContext(),
$spanContext ?? $this->resolveSpanContext(),
);

$this->processor->process($entry);
Expand All @@ -84,10 +102,27 @@ public function emit(LogRecord $record) : void
*
* @param string $body The log message
* @param Attributes|TAttributeValueMap $attributes Optional attributes
* @param null|\DateTimeImmutable $timestamp When the event occurred (defaults to current time)
* @param null|\DateTimeImmutable $observedTimestamp When the log was observed by collection
* @param null|SpanContext $spanContext Span context for trace correlation (defaults to current active span)
*/
public function error(string $body, array|Attributes $attributes = []) : void
{
$this->emit(new LogRecord(Severity::ERROR, $body, $attributes instanceof Attributes ? $attributes : Attributes::create($attributes)));
public function error(
string $body,
array|Attributes $attributes = [],
?\DateTimeImmutable $timestamp = null,
?\DateTimeImmutable $observedTimestamp = null,
?SpanContext $spanContext = null,
) : void {
$this->emit(
new LogRecord(
Severity::ERROR,
$body,
$attributes instanceof Attributes ? $attributes : Attributes::create($attributes),
$timestamp,
$observedTimestamp,
),
$spanContext,
);
}

/**
Expand All @@ -98,10 +133,27 @@ public function error(string $body, array|Attributes $attributes = []) : void
*
* @param string $body The log message
* @param Attributes|TAttributeValueMap $attributes Optional attributes
* @param null|\DateTimeImmutable $timestamp When the event occurred (defaults to current time)
* @param null|\DateTimeImmutable $observedTimestamp When the log was observed by collection
* @param null|SpanContext $spanContext Span context for trace correlation (defaults to current active span)
*/
public function fatal(string $body, array|Attributes $attributes = []) : void
{
$this->emit(new LogRecord(Severity::FATAL, $body, $attributes instanceof Attributes ? $attributes : Attributes::create($attributes)));
public function fatal(
string $body,
array|Attributes $attributes = [],
?\DateTimeImmutable $timestamp = null,
?\DateTimeImmutable $observedTimestamp = null,
?SpanContext $spanContext = null,
) : void {
$this->emit(
new LogRecord(
Severity::FATAL,
$body,
$attributes instanceof Attributes ? $attributes : Attributes::create($attributes),
$timestamp,
$observedTimestamp,
),
$spanContext,
);
}

/**
Expand All @@ -120,10 +172,27 @@ public function flush() : bool
*
* @param string $body The log message
* @param Attributes|TAttributeValueMap $attributes Optional attributes
* @param null|\DateTimeImmutable $timestamp When the event occurred (defaults to current time)
* @param null|\DateTimeImmutable $observedTimestamp When the log was observed by collection
* @param null|SpanContext $spanContext Span context for trace correlation (defaults to current active span)
*/
public function info(string $body, array|Attributes $attributes = []) : void
{
$this->emit(new LogRecord(Severity::INFO, $body, $attributes instanceof Attributes ? $attributes : Attributes::create($attributes)));
public function info(
string $body,
array|Attributes $attributes = [],
?\DateTimeImmutable $timestamp = null,
?\DateTimeImmutable $observedTimestamp = null,
?SpanContext $spanContext = null,
) : void {
$this->emit(
new LogRecord(
Severity::INFO,
$body,
$attributes instanceof Attributes ? $attributes : Attributes::create($attributes),
$timestamp,
$observedTimestamp,
),
$spanContext,
);
}

/**
Expand All @@ -150,10 +219,27 @@ public function processor() : LogProcessor
*
* @param string $body The log message
* @param Attributes|TAttributeValueMap $attributes Optional attributes
* @param null|\DateTimeImmutable $timestamp When the event occurred (defaults to current time)
* @param null|\DateTimeImmutable $observedTimestamp When the log was observed by collection
* @param null|SpanContext $spanContext Span context for trace correlation (defaults to current active span)
*/
public function trace(string $body, array|Attributes $attributes = []) : void
{
$this->emit(new LogRecord(Severity::TRACE, $body, $attributes instanceof Attributes ? $attributes : Attributes::create($attributes)));
public function trace(
string $body,
array|Attributes $attributes = [],
?\DateTimeImmutable $timestamp = null,
?\DateTimeImmutable $observedTimestamp = null,
?SpanContext $spanContext = null,
) : void {
$this->emit(
new LogRecord(
Severity::TRACE,
$body,
$attributes instanceof Attributes ? $attributes : Attributes::create($attributes),
$timestamp,
$observedTimestamp,
),
$spanContext,
);
}

/**
Expand All @@ -163,11 +249,28 @@ public function trace(string $body, array|Attributes $attributes = []) : void
* the application from functioning.
*
* @param string $body The log message
* @param array<string, array<bool|float|int|string>|bool|float|int|string>|Attributes $attributes Optional attributes
* @param Attributes|TAttributeValueMap $attributes Optional attributes
* @param null|\DateTimeImmutable $timestamp When the event occurred (defaults to current time)
* @param null|\DateTimeImmutable $observedTimestamp When the log was observed by collection
* @param null|SpanContext $spanContext Span context for trace correlation (defaults to current active span)
*/
public function warn(string $body, array|Attributes $attributes = []) : void
{
$this->emit(new LogRecord(Severity::WARN, $body, $attributes instanceof Attributes ? $attributes : Attributes::create($attributes)));
public function warn(
string $body,
array|Attributes $attributes = [],
?\DateTimeImmutable $timestamp = null,
?\DateTimeImmutable $observedTimestamp = null,
?SpanContext $spanContext = null,
) : void {
$this->emit(
new LogRecord(
Severity::WARN,
$body,
$attributes instanceof Attributes ? $attributes : Attributes::create($attributes),
$timestamp,
$observedTimestamp,
),
$spanContext,
);
}

/**
Expand Down
Loading
Loading