Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/setup-base/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ runs:
cache: 'pip'
- name: Install dependencies
shell: bash
run: make install
run: make install-dev
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
install:
install-dev:
pip install -e ".[dev]"

test:
Expand Down
23 changes: 21 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,31 @@ The properties are as follows:

#### time_column

This is the column in the dataset that contains the timestamp. It follows the [same syntax](https://sqlmesh.readthedocs.io/en/latest/concepts/models/model_kinds/#time-column) as upstream `INCREMENTAL_BY_TIME_RANGE`.
This is the column in the dataset that contains the timestamp. It follows the [same syntax](https://sqlmesh.readthedocs.io/en/latest/concepts/models/model_kinds/#time-column) as upstream `INCREMENTAL_BY_TIME_RANGE` and also the same rules with regards to respecting the project [time_column_format](https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#environments) property and being automatically added to the model `partition_by` field list.

#### primary_key

This is the column or combination of columns that uniquely identifies a record.

The columns listed here are used in the `ON` clause of the SQL Merge to join the source and target datasets.

Note that the `time_column` is **not** automatically injected into this list (to allow timestamps on records to be updated), so if the `time_column` does actually form part of the primary key in your dataset then it needs to be added here.
Note that the `time_column` is **not** automatically injected into this list (to allow timestamps on records to be updated), so if the `time_column` does actually form part of the primary key in your dataset then it needs to be added here.

#### partition_by_time_column

By default, the `time_column` will get added to the list of fields in the model `partitioned_by` property, causing it to be included in the table partition key. This may be undesirable in some circumstances.

To opt out of this behaviour, you can set `partition_by_time_column = false` like so:

```
MODEL (
name my_db.my_model,
kind CUSTOM (
materialization 'non_idempotent_incremental_by_time_range',
materialization_properties (
...,
partition_by_time_column = false
)
)
);
```
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "Utilities for SQLMesh"
readme = "README.md"
requires-python = ">= 3.9"
dependencies = [
"sqlmesh>=0.160.0"
"sqlmesh>=0.163.0"
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from sqlmesh.utils.date import make_inclusive
from sqlmesh.utils.errors import ConfigError, SQLMeshError
from pydantic import model_validator
from sqlmesh.utils.pydantic import list_of_fields_validator
from sqlmesh.utils.pydantic import list_of_fields_validator, bool_validator
from sqlmesh.utils.date import TimeLike
from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS
from sqlmesh import CustomKind
Expand All @@ -22,6 +22,8 @@ class NonIdempotentIncrementalByTimeRangeKind(CustomKind):
# this is deliberately primary_key instead of unique_key to direct away from INCREMENTAL_BY_UNIQUE_KEY
_primary_key: t.List[exp.Expression]

_partition_by_time_column: bool

@model_validator(mode="after")
def _validate_model(self):
self._time_column = TimeColumn.create(
Expand All @@ -44,6 +46,10 @@ def _validate_model(self):
"`primary_key` cannot be just the time_column. Please list the columns that when combined, uniquely identify a row"
)

self._partition_by_time_column = bool_validator(
self.materialization_properties.get("partition_by_time_column", True)
)

return self

@property
Expand All @@ -54,6 +60,10 @@ def time_column(self) -> TimeColumn:
def primary_key(self) -> t.List[exp.Expression]:
return self._primary_key

@property
def partition_by_time_column(self) -> bool:
return self._partition_by_time_column


class NonIdempotentIncrementalByTimeRangeMaterialization(
CustomMaterialization[NonIdempotentIncrementalByTimeRangeKind]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def test_kind(make_model: ModelMaker):
model = make_model(["time_column = ds", "primary_key = (id, ds)"])
assert isinstance(model.kind, NonIdempotentIncrementalByTimeRangeKind)

assert model.partitioned_by == [exp.to_column("ds", quoted=True)]
assert model.kind.partition_by_time_column

assert model.kind.time_column.column == exp.to_column("ds", quoted=True)
assert model.kind.primary_key == [
exp.to_column("id", quoted=True),
Expand Down Expand Up @@ -157,3 +160,13 @@ def test_append(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngine
dialect=adapter.dialect,
).sql(dialect=adapter.dialect)
]


def test_partition_by_time_column_opt_out(make_model: ModelMaker):
model = make_model(
["time_column = ds", "primary_key = name", "partition_by_time_column = false"]
)

assert isinstance(model.kind, NonIdempotentIncrementalByTimeRangeKind)
assert not model.kind.partition_by_time_column
assert model.partitioned_by == []
Loading