Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/core/etl/src/Flow/ETL/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@

namespace Flow\ETL;

use Flow\ETL\Pipeline\Stages;
use Flow\ETL\Pipeline\Segments;

/**
* @internal
*/
final readonly class Pipeline
{
private Stages $stages;
private Segments $stages;

public function __construct(private Extractor $extractor)
{
$this->stages = new Stages();
$this->stages = new Segments();
}

public function add(Transformer|Loader|Processor $step) : self
Expand Down Expand Up @@ -68,7 +68,7 @@ public function process(FlowContext $context) : \Generator
/**
* Get the pipeline stages.
*/
public function stages() : Stages
public function stages() : Segments
{
return $this->stages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
*
* @internal
*/
final class Stages
final class Segments
{
private Segment $currentSegment;

Expand Down
16 changes: 16 additions & 0 deletions src/core/etl/src/Flow/ETL/Schema/Definition/DateDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,22 @@ public function merge(Definition $definition) : Definition
);
}

if ($definition instanceof FloatDefinition) {
return new FloatDefinition(
$this->ref,
$this->nullable || $definition->isNullable(),
$this->metadata->merge($definition->metadata())
);
}

if ($definition instanceof IntegerDefinition) {
return new IntegerDefinition(
$this->ref,
$this->nullable || $definition->isNullable(),
$this->metadata->merge($definition->metadata())
);
}

throw new RuntimeException(\sprintf(
'Cannot merge %s with %s',
self::class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ public function merge(Definition $definition) : Definition
);
}

if ($definition instanceof FloatDefinition) {
return new FloatDefinition(
$this->ref,
$this->nullable || $definition->isNullable(),
$this->metadata->merge($definition->metadata())
);
}

if ($definition instanceof IntegerDefinition) {
return new IntegerDefinition(
$this->ref,
$this->nullable || $definition->isNullable(),
$this->metadata->merge($definition->metadata())
);
}

throw new RuntimeException(\sprintf(
'Cannot merge %s with %s',
self::class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ public function merge(Definition $definition) : Definition
);
}

if ($definition instanceof DateDefinition || $definition instanceof DateTimeDefinition) {
return new self(
$this->ref,
$this->nullable || $definition->isNullable(),
$this->metadata->merge($definition->metadata())
);
}

if ($definition instanceof StringDefinition) {
return new StringDefinition(
$this->ref,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ public function merge(Definition $definition) : Definition
);
}

if ($definition instanceof DateDefinition || $definition instanceof DateTimeDefinition) {
return new self(
$this->ref,
$this->nullable || $definition->isNullable(),
$this->metadata->merge($definition->metadata())
);
}

if ($definition instanceof StringDefinition) {
return new StringDefinition(
$this->ref,
Expand Down
Loading
Loading