Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 130 additions & 9 deletions documentation/components/libs/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,26 @@
composer require flow-php/filesystem:~--FLOW_PHP_VERSION--
```

Flow Filesystem is a unified solution to store and retrieve data at remote and local filesystems.
What differentiates Flow Filesystem from other libraries is the ability to store data in Blocks and read
it by byte ranges.
Flow Filesystem is a unified solution to store and retrieve data at remote and local filesystems.
What differentiates Flow Filesystem from other libraries is the ability to store data in Blocks and read
it by byte ranges.

This means, that while writing data to a large remote file, instead we can literally stream the data and based on the implementation
of the filesystem, it will be saved in blocks.
This means, that while writing data to a large remote file, instead we can literally stream the data and based on the
implementation
of the filesystem, it will be saved in blocks.

When reading, instead of iterating through the whole file to find the data you need, you can directly access the data you need by specifying the byte range.
When reading, instead of iterating through the whole file to find the data you need, you can directly access the data
you need by specifying the byte range.

# Available Filesystems

- Native Local Filesystem
- [Azure Blob Filesystem](https://github.com/flow-php/flow/blob/1.x/documentation/components/bridges/filesystem-azure-bridge.md)
- [Native Local Filesystem](/src/lib/filesystem/src/Flow/Filesystem/Local/NativeLocalFilesystem.php)
- [Memory Filesystem](/src/lib/filesystem/src/Flow/Filesystem/Local/MemoryFilesystem.php)
- [StdOut Filesystem](/src/lib/filesystem/src/Flow/Filesystem/Local/StdOutFilesystem.php)
- [Azure Blob Filesystem](/documentation/components/bridges/filesystem-azure-bridge) - [
`flow-php/filesystem-azure-bridge`](https://packagist.org/packages/flow-php/filesystem-azure-bridge)
- [AWS S3 Filesystem](/documentation/components/bridges/filesystem-async-aws-bridge) - [
`flow-php/filesystem-async-aws-bridge`](https://packagist.org/packages/flow-php/filesystem-async-aws-bridge)

# Building Blocks

Expand All @@ -50,7 +57,7 @@ DestinationStream::append(string $data) : self;
DestinationStream::fromResource($resource) : self;
```

- `Filesystem` - filesystem interface represents a remote/local filesystem
- `Filesystem` - filesystem interface represents a remote/local filesystem

```php
<?php
Expand Down Expand Up @@ -106,4 +113,118 @@ $stream->append('1,norbert,true');
$stream->append('2,john,true');
$stream->append('3,jane,true');
$stream->close();
```

## Telemetry

Flow Filesystem supports OpenTelemetry-compatible tracing and metrics for observability of all filesystem operations.
Flow Filesystem uses [Flow Telemetry](/documentation/components/libs/telemetry) library.

In order to use telemetry, you need to create an instance of `TraceableFilesystem` which
wraps an existing filesystem and adds telemetry to it.

Alternatively you can pass `FilesystemTelemetryConfig` to `FilesystemTable` and let it
automatically wrap all mounted filesystems with telemetry.

### DSL Functions

- `filesystem_telemetry_options()` - configure what to trace and measure
- `filesystem_telemetry_config()` - create telemetry configuration from options
- `traceable_filesystem()` - wrap an individual filesystem with telemetry

### Configuration Options

| Option | Default | Description |
|------------------|---------|---------------------------------------------------|
| `traceStreams` | `true` | Create spans for stream lifecycle (open to close) |
| `collectMetrics` | `true` | Collect bytes and operation counters |

### What Gets Traced

**Spans:**

- `SourceStream` - spans the lifecycle of a read stream from creation to close
- `DestinationStream` - spans the lifecycle of a write stream from creation to close

**Metrics:**

- `filesystem.source.bytes_read` - total bytes read from source streams
- `filesystem.source.operations` - number of read operations
- `filesystem.destination.bytes_written` - total bytes written to destination streams
- `filesystem.destination.operations` - number of write operations

Metadata operations (list, status, rm, mv) are logged but do not create spans.

### Examples

**Wrap an individual filesystem:**

```php
<?php

use function Flow\Filesystem\DSL\{
filesystem_telemetry_config,
filesystem_telemetry_options,
native_local_filesystem,
path,
traceable_filesystem
};
use function Flow\Telemetry\DSL\telemetry;
use Psr\Clock\ClockInterface;

$telemetry = telemetry(/* your configuration */);
$clock = new class implements ClockInterface {
public function now(): \DateTimeImmutable {
return new \DateTimeImmutable();
}
};

$config = filesystem_telemetry_config(
$telemetry,
$clock,
filesystem_telemetry_options(traceStreams: true, collectMetrics: true)
);

$fs = traceable_filesystem(native_local_filesystem(), $config);

// All operations on $fs are now traced
$stream = $fs->readFrom(path('/path/to/file.csv'));
```

**Enable telemetry on FilesystemTable:**

```php
<?php

use function Flow\Filesystem\DSL\{
filesystem_telemetry_config,
filesystem_telemetry_options,
fstab
};

$config = filesystem_telemetry_config($telemetry, $clock);
$fstab = fstab();
$fstab->withTelemetry($config);

// All filesystems in the table are now wrapped with telemetry
```

**Disable specific features:**

```php
<?php

use function Flow\Filesystem\DSL\filesystem_telemetry_options;

// Collect metrics only, no spans
$options = filesystem_telemetry_options(
traceStreams: false,
collectMetrics: true
);

// Trace streams only, no metrics
$options = filesystem_telemetry_options(
traceStreams: true,
collectMetrics: false
);
```
36 changes: 32 additions & 4 deletions src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

namespace Flow\ETL\Config;

use function Flow\Filesystem\DSL\fstab;
use Composer\InstalledVersions;
use function Flow\Filesystem\DSL\{filesystem_telemetry_config, fstab};
use Flow\Clock\SystemClock;
use Flow\ETL\{Analyze, Cache, Config, NativePHPRandomValueGenerator, RandomValueGenerator};
use Flow\ETL\Config\Cache\CacheConfigBuilder;
Expand All @@ -18,7 +17,7 @@
use Flow\ETL\Row\EntryFactory;
use Flow\Filesystem\{Filesystem, FilesystemTable};
use Flow\Serializer\{Base64Serializer, NativePHPSerializer, Serializer};
use Flow\Telemetry\Telemetry;
use Flow\Telemetry\{PackageVersion, Telemetry};
use Psr\Clock\ClockInterface;

final class ConfigBuilder
Expand Down Expand Up @@ -60,7 +59,7 @@ public function __construct()
$this->randomValueGenerator = new NativePHPRandomValueGenerator();
$this->analyze = null;
$this->telemetryConfig = null;
$this->version = InstalledVersions::getPrettyVersion('flow-php/etl') ?: InstalledVersions::getPrettyVersion('flow-php/flow') ?? 'unknown';
$this->version = PackageVersion::get('flow-php/etl') === 'unknown' ? PackageVersion::get('flow-php/flow') : PackageVersion::get('flow-php/etl');
}

public function analyze(Analyze $analyze) : self
Expand Down Expand Up @@ -190,15 +189,44 @@ public function withTelemetry(Telemetry $telemetry, TelemetryOptions $options =
{
$this->telemetryConfig = new TelemetryConfig($telemetry, $options);

if ($this->fstab !== null) {
$this->fstab->withTelemetry(
filesystem_telemetry_config(
$telemetry,
$this->clock ?? SystemClock::utc(),
$options->filesystem
)
);
}

return $this;
}

private function fstab() : FilesystemTable
{
if ($this->fstab === null) {
$this->fstab = fstab();

$filesystemOptions = $this->telemetryConfig?->options->filesystem;

if ($filesystemOptions !== null && ($filesystemOptions->traceStreams || $filesystemOptions->collectMetrics)) {
$this->fstab->withTelemetry(filesystem_telemetry_config(
$this->telemetry()->telemetry,
$this->clock ?? SystemClock::utc(),
$filesystemOptions
));
}
}

return $this->fstab;
}

private function telemetry() : TelemetryConfig
{
if ($this->telemetryConfig === null) {
$this->telemetryConfig = TelemetryConfig::default($this->clock ?? SystemClock::utc());
}

return $this->telemetryConfig;
}
}
28 changes: 22 additions & 6 deletions src/core/etl/src/Flow/ETL/Config/Telemetry/TelemetryOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,55 @@

namespace Flow\ETL\Config\Telemetry;

use Flow\Filesystem\Telemetry\FilesystemTelemetryOptions;

final readonly class TelemetryOptions
{
public function __construct(
public bool $traceLoading = false,
public bool $traceTransformations = false,
public bool $collectMetrics = false,
public FilesystemTelemetryOptions $filesystem = new FilesystemTelemetryOptions(),
) {
}

public function collectMetrics(bool $collect) : self
public function collectMetrics(bool $collect = true) : self
{
return new self(
traceLoading: $this->traceLoading,
traceTransformations: $this->traceTransformations,
collectMetrics: $collect,
filesystem: $this->filesystem,
);
}

public function filesystem(FilesystemTelemetryOptions $options) : self
{
return new self(
traceLoading: $this->traceLoading,
traceTransformations: $this->traceTransformations,
collectMetrics: $collect
collectMetrics: $this->collectMetrics,
filesystem: $options,
);
}

public function traceLoading(bool $trace) : self
public function traceLoading(bool $trace = true) : self
{
return new self(
traceLoading: $trace,
traceTransformations: $this->traceTransformations,
collectMetrics: $this->collectMetrics
collectMetrics: $this->collectMetrics,
filesystem: $this->filesystem,
);
}

public function traceTransformations(bool $trace) : self
public function traceTransformations(bool $trace = true) : self
{
return new self(
traceLoading: $this->traceLoading,
traceTransformations: $trace,
collectMetrics: $this->collectMetrics
collectMetrics: $this->collectMetrics,
filesystem: $this->filesystem,
);
}
}
5 changes: 4 additions & 1 deletion src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@
use Flow\ETL\Transformer\Rename\{RenameCaseEntryStrategy, RenameReplaceEntryStrategy};
use Flow\Filesystem\{Filesystem, Local\NativeLocalFilesystem, Partition, Partitions, Path};
use Flow\Filesystem\Stream\Mode;
use Flow\Filesystem\Telemetry\FilesystemTelemetryOptions;
use Flow\Serializer\{NativePHPSerializer, Serializer};
use Flow\Types\Type;
use Flow\Types\Type\Logical\{DateTimeType,
Expand Down Expand Up @@ -268,11 +269,13 @@ function telemetry_options(
bool $trace_loading = false,
bool $trace_transformations = false,
bool $collect_metrics = false,
?FilesystemTelemetryOptions $filesystem = null,
) : TelemetryOptions {
return new TelemetryOptions(
$trace_loading,
$trace_transformations,
$collect_metrics
$collect_metrics,
$filesystem ?? new FilesystemTelemetryOptions()
);
}

Expand Down
Loading
Loading