diff --git a/Airflow@1.10/pyproject.toml b/Airflow@1.10/pyproject.toml index bc2f7d2..f255995 100644 --- a/Airflow@1.10/pyproject.toml +++ b/Airflow@1.10/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "viewflow" -version = "0.1.0" +version = "0.2.0" description = "Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code." authors = ["Vincent Vankrunkelsven ", "Ramnath Vaidyanathan ", "Gaëtan Podevijn "] readme = "README.md" diff --git a/README.md b/README.md index e050da6..1047b41 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Do you want more context on why we built and released Viewflow? Check out our an ## Viewflow demo -We created a demo that shows how Viewflow works. The demo creates multiple DAGs: `viewflow-demo-1` through `viewflow-demo-4`. These DAGs create a total of four views in a local Postgres database. Check out the view files in [demo/dags/](./demo/dags/). Some of the following commands are different based on which Airflow version you're using. For new users, Airflow 2 is the best option. However, you can also run the demo using the older Airflow 1.10 version by using the indicated commands. +We created a demo that shows how Viewflow works. The demo creates multiple DAGs: `viewflow-demo-1` through `viewflow-demo-3`. These DAGs create a total of four views in a local Postgres database. Check out the view files in [demo/dags/](./demo/dags/). Some of the following commands are different based on which Airflow version you're using. For new users, Airflow 2 is the best option. However, you can also run the demo using the older Airflow 1.10 version by using the indicated commands. ### Run the demo We use `docker-compose` to instantiate an Apache Airflow instance and a Postgres database. The Airflow container and the Postgres container are defined in the `docker-compose-airflow.yml` files. The first time you want to run the demo, you will first have to build the Apache Airflow docker image that embeds Viewflow: @@ -28,13 +28,12 @@ docker-compose -f docker-compose-airflow2.yml up # Airflow 2 docker-compose -f docker-compose-airflow1.10.yml up # Airflow 1.10 ``` -Go to your local Apache Airflow instance on [http://localhost:8080](http://localhost:8080). There are four DAGs called `viewflow-demo-1` through `viewflow-demo-4`. Notice how Viewflow automatically generated these DAGs based on the example queries in the subfolders of [demo/dags/](./demo/dags/)! +Go to your local Apache Airflow instance on [http://localhost:8080](http://localhost:8080). There are three DAGs called `viewflow-demo-1` through `viewflow-demo-3`. Notice how Viewflow automatically generated these DAGs based on the example queries in the subfolders of [demo/dags/](./demo/dags/)! - - - +By default, the DAGs are disabled. You will first have to turn them on. This will trigger the DAGs. -By default, the DAGs are disabled. Turn the DAGs on by clicking on the button `Off`. This will trigger the DAGs. + + ### Query the views @@ -193,7 +192,7 @@ Viewflow expects some metadata that must be included in the SQL and Python files * **schema**: The name of the schema in which Viewflow creates the view. It's also used by Viewflow to create the dependencies. * **connection_id**: Airflow connection name used to connect to the database (See Section [*Create an Airflow connection to your destination*](https://github.com/datacamp/viewflow#create-an-airflow-connection-to-your-destination)). -The newly created view has the same name as the filename of the SQL query, Python script or R(md) script. +The newly created view has the same name as the filename (actually the file stem, without extension) of the SQL query, Python script or R(md) script. Viewflow materializes the view in the database with this name, so it must be unique over all DAGs! ### SQL views @@ -244,14 +243,14 @@ Please note that Viewflow expects the Python function that creates the view to h ### R views -Viewflow handles R scripts similar to the existing SQL and Python files. Additionally, there's an element of automatisation. You simply define the view in R code, Viewflow will automatically read the necessary tables and write the new view to the database. Note that you need to define the new view in the R script with the same name as the R script (which is also the name of the table where the view is materialized in the database). +Viewflow handles R scripts similar to the existing SQL and Python files. Additionally, there's an element of automatisation. You simply define the view in R code, Viewflow will automatically read the necessary tables and write the new view to the database. Note that you need to define the new view in the R script with the same name as the R script (which is also the name of the table of the materialized view in the database). By default, other tables are expected to be referenced as `.`. This default behaviour can be changed by adding a new function in [dependencies_r_patterns.py](./viewflow/parsers/dependencies_r_patterns.py) and adding a line `dependency_function: ` to the metadata of the R script. The script [user_xp_duplicate.R](./demo/dags/viewflow-demo-3/user_xp_duplicate.R) illustrates this. ### Rmd views -Rmd scripts can be used mostly like R scripts. For Rmd scripts, you do have to explicitly configure the automated reading and writing of tables by adding `automate_read_write: True` to the metadata. By default, the script is executed as is. The task [top_3_user_xp_duplicate.Rmd](./demo/dags/viewflow-demo-4/top_3_user_xp_duplicate.Rmd) contains an explanation of the usage of Rmd scripts. +Rmd scripts can be used mostly like R scripts. For Rmd scripts, you do have to explicitly configure the automated reading and writing of tables by adding `automate_read_write: True` to the metadata. By default, the script is executed as is. The task [top_3_user_xp_duplicate.Rmd](./demo/dags/viewflow-demo-3/top_3_user_xp_duplicate.Rmd) contains an explanation of the usage of Rmd scripts. ## Configuring callbacks @@ -276,6 +275,21 @@ on_retry_callback: on_retry_callback_custom Of course, options 1, 2 and 3 can be combined to efficiently configure the callbacks of a multitude of tasks. +## Incremental updates + +SQL views offer an extra feature for advanced users: incremental updating. In some cases, it's possible to update the materialized view very efficiently instead of creating the view from scratch. We will illustrate the advantages and disadvantages of incremental updates with an example: [emails_blog.sql](./demo/dags/viewflow-demo-2/emails_blog.sql). + +In the query, the `users` table is joined with the `notifications` table. Keep in mind that this query is run on a regular basis, e.g. every day. The key to understanding the incremental update is the filter in the query: the `notifications.updated_at` field is required to be at least as large as the maximal value in the "old" materialized view. This filter will effectively only select rows corresponding to recently created/changed rows in the `notifications` table. Viewflow will then make sure the selected rows are updated or inserted in the materialized view. Under the hood, this is implemented as in [this link](https://docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html). For this to work, you have to specify the fields of the primary key of the materialized view in the metadata. In summary, there are 3 additional mandatory fields in the metadata: `type`, `primary_key` and `time_parameters`. + +The main advantage is now clear: the incremental update is incredibly efficient, especially if you run the query frequently for a long time. A disadvantage also becomes clear in the example: you have to be careful about stale data. Because the example query only returns results corresponding to recently changed rows of the `notifications` table, changes to the `users.email` field can go unnoticed. If a user's email is changed while the `notifications` table stays the same, then the materialized view will still contain the old email address after running the incremental update! This issue could easily be solved by adding an `updated_at` field to the `users` table and also selecting recently changed rows from this table. + +```sql +SELECT user_id, notification_mode, email, n.updated_at +FROM viewflow_raw.users u INNER JOIN viewflow_raw.notifications n ON n.user_id = u.id +WHERE + category = 'blog' AND + (u.updated_at >= {{min_time}} OR n.updated_at >= {{min_time}}) +``` # Contributing to Viewflow diff --git a/demo/dags/viewflow-demo-2/emails_blog.sql b/demo/dags/viewflow-demo-2/emails_blog.sql new file mode 100644 index 0000000..d9e7e6d --- /dev/null +++ b/demo/dags/viewflow-demo-2/emails_blog.sql @@ -0,0 +1,28 @@ +/* +--- +owner: data@datacamp.com +type: IncrementalPostgresOperator +description: For all users, list the email address and notification mode of the category blog. +fields: + user_id: The user ID + notification_mode: The detailed mode for which notifications to receive + email: Email address of the user +schema: viewflow_demo +connection_id: postgres_demo +primary_key: [user_id] +time_parameters: + initial: + min_time: '''2020-01-01 12:00:00''' + update: + min_time: (SELECT max(updated_at) FROM viewflow_demo.emails_blog) +--- +*/ + +SELECT user_id, notification_mode, email, updated_at +FROM + viewflow_raw.notifications n + INNER JOIN viewflow_raw.users u + ON n.user_id = u.id +WHERE + category = 'blog' AND + updated_at >= {{min_time}} diff --git a/demo/dags/viewflow-demo-4/top_3_user_xp_duplicate.Rmd b/demo/dags/viewflow-demo-3/top_3_user_xp_duplicate.Rmd similarity index 100% rename from demo/dags/viewflow-demo-4/top_3_user_xp_duplicate.Rmd rename to demo/dags/viewflow-demo-3/top_3_user_xp_duplicate.Rmd diff --git a/demo/dags/viewflow-demo-4/config.yml b/demo/dags/viewflow-demo-4/config.yml deleted file mode 100644 index a112000..0000000 --- a/demo/dags/viewflow-demo-4/config.yml +++ /dev/null @@ -1,8 +0,0 @@ -default_args: - owner: data@datacamp.com - retries: 1 - catchup: false -schedule_interval: 0 6 * * * -start_date: "2021-07-15" -max_active_runs: 1 -catchup: false \ No newline at end of file diff --git a/demo/scripts/load_postgres.sql b/demo/scripts/load_postgres.sql index c2dabb6..02101b9 100755 --- a/demo/scripts/load_postgres.sql +++ b/demo/scripts/load_postgres.sql @@ -76,4 +76,26 @@ VALUES (8, 5, '2021-01-21 10:00:00', '2021-01-21 14:00:00'), (9, 1, '2021-02-15 10:00:00', '2021-02-16 14:00:00'), (10, 1, '2021-03-15 10:00:00', '2021-03-15 14:00:00'), -(10, 3, '2021-03-16 10:00:00', '2021-03-16 14:00:00'); \ No newline at end of file +(10, 3, '2021-03-16 10:00:00', '2021-03-16 14:00:00'); + + +DROP TABLE IF EXISTS viewflow_raw.notifications; +CREATE TABLE viewflow_raw.notifications ( + user_id INTEGER, + category VARCHAR, + notification_mode VARCHAR, + updated_at TIMESTAMP, + PRIMARY KEY (user_id, category) +); +INSERT INTO + viewflow_raw.notifications (user_id, category, notification_mode, updated_at) +VALUES + (1, 'daily', 'off', '2021-12-01 12:00:00'), + (1, 'recommended', 'off', '2021-12-01 12:00:00'), + (1, 'blog', 'selection', '2021-12-01 12:00:00'), + (2, 'daily', 'all', '2022-11-01 12:00:00'), + (2, 'recommended', 'off', '2022-11-01 12:00:00'), + (2, 'blog', 'all', '2022-11-01 12:00:00'), + (3, 'daily', 'selection', '2023-10-01 12:00:00'), + (3, 'recommended', 'selection', '2023-10-01 12:00:00'), + (3, 'blog', 'all', '2023-10-01 12:00:00'); diff --git a/img/airflow_web_homepage.png b/img/airflow_web_homepage.png new file mode 100644 index 0000000..602dd22 Binary files /dev/null and b/img/airflow_web_homepage.png differ diff --git a/img/viewflow-demo-2.png b/img/viewflow-demo-2.png deleted file mode 100644 index e4cd91d..0000000 Binary files a/img/viewflow-demo-2.png and /dev/null differ diff --git a/pyproject.toml b/pyproject.toml index 747d0ec..33eebc6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "viewflow" -version = "0.1.0" +version = "0.2.0" license = "MIT" description = "Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code." authors = ["Vincent Vankrunkelsven ", "Ramnath Vaidyanathan ", "Gaëtan Podevijn "] diff --git a/tests/adapters/test_postgres_incremental.py b/tests/adapters/test_postgres_incremental.py new file mode 100644 index 0000000..8254c79 --- /dev/null +++ b/tests/adapters/test_postgres_incremental.py @@ -0,0 +1,99 @@ +import viewflow +from datetime import datetime, date + +from airflow.models import TaskInstance, Connection +from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow import settings +from airflow.utils import db + + +def _create_postgres_session(): + session = settings.Session() + conn = Connection( + conn_id="postgres_viewflow", + conn_type="postgres", + login="user", + password="passw0rd", + schema="viewflow", + host="localhost", + port=5432, + ) + db.merge_conn(conn, session) + return session + + +def test_incremental_updates(): + session = _create_postgres_session() + + dag = viewflow.create_dag("./tests/projects/postgresql/incremental_operator") + task = dag.get_task("emails_blog") + + with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn: + with conn.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS viewflow.emails_blog") + + # Table 'emails_blog' does not yet exist --> query must be run with initial time parameters + ti = TaskInstance(task, datetime(2020, 1, 1)) + ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True, session=session) + with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM viewflow.emails_blog") + (count,) = cur.fetchone() + assert count == 1 + + cur.execute("SELECT * FROM viewflow.emails_blog") + (user_id, notification_mode, email, updated_at, __view_generated_at) = cur.fetchone() + assert user_id == 1 + assert notification_mode == "selection" + assert email == "test1@datacamp.com" + assert updated_at == datetime.strptime("2021-12-01 12:00:00", "%Y-%m-%d %H:%M:%S") + assert __view_generated_at == date.today() + + # First incremental update --> additional rows are added (only 1 in this case) + ti = TaskInstance(task, datetime(2020, 1, 1)) + ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True, session=session) + with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM viewflow.emails_blog") + (count,) = cur.fetchone() + assert count == 2 + + # Second incremental update --> additional row is added + ti = TaskInstance(task, datetime(2020, 1, 1)) + ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True, session=session) + with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM viewflow.emails_blog") + (count,) = cur.fetchone() + assert count == 3 + + # User 1 disables the blog notifications + with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + UPDATE viewflow.notifications + SET notification_mode='off', updated_at=timestamp '2024-9-01 12:00:00' + WHERE user_id=1 AND category='blog' + """) + + # Third incremental update --> changed row must be updated + ti = TaskInstance(task, datetime(2020, 1, 1)) + ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True, session=session) + with PostgresHook(postgres_conn_id="postgres_viewflow").get_conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM viewflow.emails_blog") + (count,) = cur.fetchone() + assert count == 3 + + cur.execute("SELECT notification_mode, updated_at FROM viewflow.emails_blog WHERE user_id = 1") + (notification_mode, updated_at) = cur.fetchone() + + assert notification_mode == "off" + assert updated_at == datetime.strptime("2024-9-01 12:00:00", "%Y-%m-%d %H:%M:%S") + + # Restore change to user 1's notification mode + cur.execute(""" + UPDATE viewflow.notifications + SET notification_mode='selection', updated_at=timestamp '2021-12-01 12:00:00' + WHERE user_id=1 AND category='blog'; + """) diff --git a/tests/fixtures/load_postgres.sql b/tests/fixtures/load_postgres.sql index f1237eb..6f9f892 100644 --- a/tests/fixtures/load_postgres.sql +++ b/tests/fixtures/load_postgres.sql @@ -20,22 +20,24 @@ VALUES ('test7@datacamp.com', 'testtest7'), ('test8@datacamp.com', 'testtest8'); -DROP TABLE IF EXISTS viewflow.incremental_users; -CREATE TABLE viewflow.incremental_users ( - user_id SERIAL, - email VARCHAR, - password VARCHAR +DROP TABLE IF EXISTS viewflow.notifications; +CREATE TABLE viewflow.notifications ( + user_id INTEGER, + category VARCHAR, + notification_mode VARCHAR, + updated_at TIMESTAMP, + PRIMARY KEY (user_id, category) ); - INSERT INTO - viewflow.incremental_users (email, password) + viewflow.notifications (user_id, category, notification_mode, updated_at) VALUES - ('test_incremental1@datacamp.com', 'testtest1'), - ('test_incremental2@datacamp.com', 'testtest2'), - ('test_incremental3@datacamp.com', 'testtest3'), - ('test_incremental4@datacamp.com', 'testtest4'), - ('test_incremental5@datacamp.com', 'testtest5'), - ('test_incremental6@datacamp.com', 'testtest6'), - ('test_incremental7@datacamp.com', 'testtest7'), - ('test_incremental8@datacamp.com', 'testtest8'); \ No newline at end of file + (1, 'daily', 'off', '2021-12-01 12:00:00'), + (1, 'recommended', 'off', '2021-12-01 12:00:00'), + (1, 'blog', 'selection', '2021-12-01 12:00:00'), + (2, 'daily', 'all', '2022-11-01 12:00:00'), + (2, 'recommended', 'off', '2022-11-01 12:00:00'), + (2, 'blog', 'all', '2022-11-01 12:00:00'), + (3, 'daily', 'selection', '2023-10-01 12:00:00'), + (3, 'recommended', 'selection', '2023-10-01 12:00:00'), + (3, 'blog', 'all', '2023-10-01 12:00:00'); diff --git a/tests/projects/postgresql/incremental_operator/config.yml b/tests/projects/postgresql/incremental_operator/config.yml new file mode 100644 index 0000000..2b213b8 --- /dev/null +++ b/tests/projects/postgresql/incremental_operator/config.yml @@ -0,0 +1,4 @@ +default_args: + owner: data@datacamp.com +schedule_interval: 0 5 * * * +start_date: "2020-04-29" diff --git a/tests/projects/postgresql/incremental_operator/emails_blog.sql b/tests/projects/postgresql/incremental_operator/emails_blog.sql new file mode 100644 index 0000000..768c43d --- /dev/null +++ b/tests/projects/postgresql/incremental_operator/emails_blog.sql @@ -0,0 +1,32 @@ +/* +--- +owner: engineering@datacamp.com +type: IncrementalPostgresOperator +description: For all users, list the email address and notification mode of the category blog. +fields: + user_id: The user ID + notification_mode: The detailed mode for which notifications to receive + email: Email address of the user +schema: viewflow +connection_id: postgres_viewflow +time_parameters: + initial: + min_time: '''2020-01-01 12:00:00''' + max_time: '''2021-12-31 12:00:00''' + update: + min_time: (SELECT max(updated_at) FROM viewflow.emails_blog) + max_time: (SELECT (max(updated_at) + interval '1 day' * 365) FROM viewflow.emails_blog) +primary_key: [user_id] +--- +*/ + +SELECT u.user_id, notification_mode, email, updated_at +FROM + viewflow.users u + INNER JOIN viewflow.notifications n + ON n.user_id = u.user_id +WHERE + category = 'blog' AND + updated_at >= {{min_time}} AND + updated_at < {{max_time}} + diff --git a/viewflow/adapters/postgresql/postgres_adapter.py b/viewflow/adapters/postgresql/postgres_adapter.py index 0e96716..ce988c5 100644 --- a/viewflow/adapters/postgresql/postgres_adapter.py +++ b/viewflow/adapters/postgresql/postgres_adapter.py @@ -2,15 +2,16 @@ from jinja2 import Template from pathlib import Path from typing import Dict, Any +import logging from airflow.providers.postgres.operators.postgres import PostgresOperator # type: ignore -from airflow.models import BaseOperator +from airflow.models import BaseOperator # type: ignore +from viewflow.operators.incremental_postgres_operator import IncrementalPostgresOperator from ..post_execute_monkey_patch import monkey_post_execute SQL_TEMPLATE = ( Path(os.path.dirname(os.path.realpath(__file__))) / "template.sql" ).read_text() - def _get_postgres_operator(parsed_task: Dict[str, Any]) -> PostgresOperator: return PostgresOperator( sql=SQL_TEMPLATE, @@ -31,6 +32,21 @@ def _get_postgres_operator(parsed_task: Dict[str, Any]) -> PostgresOperator: ) +def _get_incremental_postgres_operator(parsed_task: Dict[str, Any]) -> IncrementalPostgresOperator: + return IncrementalPostgresOperator( + conn_id=parsed_task["connection_id"], + task_id=parsed_task["task_id"], + description=parsed_task["description"], + content=parsed_task["content"], + owner=parsed_task["owner"], + schema=parsed_task["schema"], + parameters=parsed_task.get("parameters", {}), + time_parameters=parsed_task["time_parameters"], + primary_key=parsed_task["primary_key"], + fields=parsed_task.get("fields", {}), + alias=parsed_task.get("alias"), + ) + def create_task(parsed_task: Dict[str, Any]): parsed_task["fields"] = { key: value.strip() for key, value in parsed_task.get("fields", {}).items() @@ -39,7 +55,13 @@ def create_task(parsed_task: Dict[str, Any]): None if "description" not in parsed_task else parsed_task["description"].strip() ) - operator = _get_postgres_operator(parsed_task) + if parsed_task["type"] == "PostgresOperator": + operator = _get_postgres_operator(parsed_task) + elif parsed_task["type"] == "IncrementalPostgresOperator": + operator = _get_incremental_postgres_operator(parsed_task) + else: + logging.error(f"Invalid operator type for Postgres adapter\nThe parsed_task:\n{parsed_task}") + operator.schema_name = parsed_task.get("schema") operator.conn_id = parsed_task.get("connection_id") operator.post_execute = monkey_post_execute.__get__(operator, BaseOperator) diff --git a/viewflow/create_dag.py b/viewflow/create_dag.py index 757b9e1..826f6dc 100644 --- a/viewflow/create_dag.py +++ b/viewflow/create_dag.py @@ -38,14 +38,20 @@ DAG_CONFIG_FILE = "config.yml" OPERATORS = { "PostgresOperator": postgres_adapter.create_task, + "IncrementalPostgresOperator": postgres_adapter.create_task, "PythonToPostgresOperator": python_adapter.create_task, "RmdOperator": rmd_adapter.create_task, "ROperator": r_adapter.create_task } -PARSERS = {".yml": parse_yml, ".sql": parse_sql, ".py": parse_python, ".rmd": parse_rmd, ".r": parse_r} - -SQL_OPERATORS = ["PostgresOperator"] +PARSERS = { + ".yml": parse_yml, + ".yaml": parse_yml, + ".sql": parse_sql, + ".py": parse_python, + ".rmd": parse_rmd, + ".r": parse_r +} @dataclass @@ -117,6 +123,11 @@ def parse_task_file( def get_all_dependencies(task, schema_name): if task["type"] == "PostgresOperator": dependencies = get_sql_dependencies(task["content"], schema_name) + elif task["type"] == "IncrementalPostgresOperator": + dependencies = get_sql_dependencies(task["content"], schema_name) + # The incremental update query can refer to the view itself without any problem + if task["task_id"] in dependencies: + dependencies.remove(task["task_id"]) elif task["type"] == "PythonToPostgresOperator": dependencies = get_python_dependencies(task["content"], schema_name) elif task["type"] == "RmdOperator": diff --git a/viewflow/operators/incremental_postgres_operator.py b/viewflow/operators/incremental_postgres_operator.py new file mode 100644 index 0000000..8dcf0d7 --- /dev/null +++ b/viewflow/operators/incremental_postgres_operator.py @@ -0,0 +1,91 @@ +from jinja2 import Template +from pathlib import Path +import os + +from airflow.providers.postgres.hooks.postgres import PostgresHook # type: ignore +from airflow.providers.postgres.operators.postgres import PostgresOperator # type: ignore + + +SQL_TEMPLATE = ( + Path(os.path.dirname(os.path.realpath(__file__))) / "template_incremental.sql" +).read_text() + +class IncrementalPostgresOperator(PostgresOperator): + """ + Operator intended to efficiently update a materialized view on a regular basis. + The destination view is not made from scratch every time the operator runs, only recently inserted/updated rows from the data source are taken into account. + This is done by running a query with a filter: only rows between the min_time and max_time parameters are selected. + + The destination view is updated by upserting the resulting rows of the query. + Rows that have been changed (determined based on the primary key specified in the yml file) are updated, new rows are inserted. + """ + + def __init__( + self, + conn_id, + task_id, + description, + content, + owner, + schema, + parameters, + time_parameters, + primary_key, + fields, + alias, + default_args={} + ): + + super().__init__( + sql="", # To be determined in self.execute + task_id=task_id, + postgres_conn_id=conn_id, + email=owner, + owner=owner, + default_args=default_args + ) + + self.conn_id = conn_id + self.task_id = task_id + self.description = description + self.content = content + self.owner = owner + self.schema = schema + self.parameters = parameters, + self.time_parameters = time_parameters + self.primary_key = primary_key + self.fields = fields + self.alias = alias + + + def get_query(self): + """Return the query that selects the new rows for the target view. + If the target view doesn't yet exist, use the initial parameters. If it does, use the update parameters.""" + + # Does table already exist? + with PostgresHook(postgres_conn_id=self.conn_id).get_conn().cursor() as cur: + cur.execute(f"select exists(select * from information_schema.tables where table_schema='{self.schema}' and table_name='{self.task_id}');") + table_exists = cur.fetchone()[0] + + # The result is equal to self.content where the appropriate parameters (min_time and max_time) are filled out. + return Template(self.content).render( + min_time=self.time_parameters["update" if table_exists else "initial"]["min_time"], + max_time=self.time_parameters["update" if table_exists else "initial"].get("max_time"), + parameters=self.parameters + ) + + + def execute(self, context): + """If the view doesn't yet exist, run the initial query. If it does, run the incremental update.""" + + self.sql = Template(SQL_TEMPLATE).render(params={ + "task_id": self.task_id, + "fields": self.fields, + "description": self.description, + "schema": self.schema, + "alias": self.alias, + "primary_key": self.primary_key, + "query": self.get_query() + }) + + super().execute(context) diff --git a/viewflow/operators/template_incremental.sql b/viewflow/operators/template_incremental.sql new file mode 100644 index 0000000..9482bbf --- /dev/null +++ b/viewflow/operators/template_incremental.sql @@ -0,0 +1,45 @@ + +-- Create staging table with new rows for target table +DROP TABLE IF EXISTS {{ params.schema }}.{{ params.task_id }}_stage; +CREATE TABLE + {{ params.schema }}.{{ params.task_id }}_stage + AS ( + {{ params.query }} + ); + +-- Create target table if necessary +CREATE TABLE IF NOT EXISTS {{ params.schema }}.{{ params.task_id }} (LIKE {{ params.schema }}.{{ params.task_id }}_stage); + + +BEGIN TRANSACTION; +-- Delete changed rows from target table +DELETE FROM {{ params.schema }}.{{ params.task_id }} AS target +USING {{ params.schema }}.{{ params.task_id }}_stage AS stage +WHERE + {% for column in params.primary_key %} + {% if loop.index > 1 %} + AND + {% endif %} + target.{{ column }} = stage.{{ column }} + {% endfor %}; + +-- Insert all rows from staging table into target table +INSERT INTO {{ params.schema }}.{{ params.task_id }} + SELECT * FROM {{ params.schema }}.{{ params.task_id }}_stage; +END TRANSACTION; + + +DROP TABLE {{ params.schema }}.{{ params.task_id }}_stage; + + +-- Create aliases +{% if params.alias is not none %} +CREATE OR REPLACE VIEW {{ params.schema }}.{{ params.alias }} AS (SELECT * FROM {{ params.schema }}.{{ params.task_id }}) WITH NO SCHEMA BINDING; +{% endif %} + +-- Comment the table +-- NOTE: Add more metadata: owner, tags, alias +COMMENT ON TABLE {{ params.schema }}.{{ params.task_id }} IS '{{ params.description }}'; +{% for name, value in params.fields.items() %} +COMMENT ON COLUMN {{ params.schema }}.{{ params.task_id }}."{{ name }}" IS '{{ value }}'; +{% endfor %} diff --git a/viewflow/parsers/parse_sql.py b/viewflow/parsers/parse_sql.py index 0e9290a..2b69581 100644 --- a/viewflow/parsers/parse_sql.py +++ b/viewflow/parsers/parse_sql.py @@ -10,7 +10,7 @@ def parse_sql(file: pathlib.Path) -> Dict[str, Any]: yml = "\n".join(content[l[0] + 1 : l[1]]) task_config = yaml.safe_load(yml) extras = { - "type": "PostgresOperator", + "type": task_config.get("type", "PostgresOperator"), "content": "\n".join(content[0 : max(l[0]-1,0)] + content[(l[1] + 2) :]), "task_file_path": str(file), }