How to Add Self-Referential Transformations

Datacoral allows transformations to depend on their prior outputs. This feature is enabled by specifying 'self-referential' flag in the transformation definition.

note

This feature supports Amazon Redshift and Amazon Athena destination warehouses.

Self-referential transformations have multiple use cases

  • Setting retention
  • Performing cumulative aggregates without making copies
  • Delete table contents based on conditions

Here is an example illustrating how to add a self-referential transformation that deletes data older than 3 months

Step 1: Create a non datacoral connector

Create a non-datacoral connector named input_schema and add a loadunit called input_table syncing every 5 minutes.

Contact support at support@datacoral.co to add the Non-Datacoral connector.

Step 2: Create table in Redshift

CREATE SCHEMA IF NOT EXISTS input_schema;
CREATE TABLE IF NOT EXISTS input_schema.input_table(
id BIGINT,
org_id BIGINT,
user_id BIGINT,
updated_at TIMESTAMP
);
INSERT INTO input_schema.input_table VALUES
(1, 1, 1, '2020-01-01 00:00:00'::TIMESTAMP),
(2, 1, 1, '2020-02-01 00:00:00'::TIMESTAMP),
(3, 1, 1, '2020-03-01 00:00:00'::TIMESTAMP),
(4, 1, 1, '2020-04-01 00:00:00'::TIMESTAMP),
(5, 1, 1, '2020-05-01 00:00:00'::TIMESTAMP),
(6, 1, 1, '2020-06-01 00:00:00'::TIMESTAMP),
(7, 1, 1, '2020-07-01 00:00:00'::TIMESTAMP),
(8, 1, 1, '2020-08-01 00:00:00'::TIMESTAMP),
(9, 1, 1, '2020-09-01 00:00:00'::TIMESTAMP),
(10, 1, 1, '2020-10-01 00:00:00'::TIMESTAMP),
(11, 1, 1, '2020-11-01 00:00:00'::TIMESTAMP),
(12, 1, 1, '2020-12-01 00:00:00'::TIMESTAMP);

The input_table will have rows one per month.

Step 3: Create the transformation schema

Create the transformation schema using the below query.

CREATE SCHEMA IF NOT EXISTS mv_schema;
GRANT ALL PRIVILEGES ON SCHEMA mv_schema TO datacoral;

Step 4: Create the self-referential transformation

You can now create the self-referential transformation using a dpl (mv_schema/mvname.dpl) file like below:

/**
* @datacoral O-1.0.0
* @slice-name redshift
* @matview-schema mv_schema
* @matview-name mvname
* @interval-in-minutes 60
* @update-mode merge
* @sequential-refresh true
* @self-referential true
* @primary-keys id
* @update-ts-col updated_at
* @delete-ts-col should_delete
* @table-type regular
* @historical-sync-query SELECT id,
* org_id,
* user_id,
* updated_at,
* NULL::boolean AS should_delete
* FROM
* input_schema.input_table;
*/
WITH delete_older_than_90_days AS (
-- Select the rows that are older than 90 days so that they can be
-- they can be deleted from the destination table as part of the
-- merge transformation
SELECT
id AS id,
NULL AS org_id,
NULL AS user_id,
updated_at AS updated_at,
TRUE AS should_delete
FROM
mv_schema.mv_mvname
WHERE
updated_at < ('{YYYY}-{MM}-{DD}T{HH}:00:00' :: TIMESTAMP - interval '90 day')
),
transformation AS (
SELECT
id AS id,
org_id AS org_id,
user_id AS user_id,
updated_at,
NULL AS should_delete
FROM
input_schema.input_table
WHERE
-- Only fetch 1 hour worth of data for incremental transformation
updated_at >= '{YYYY}-{MM}-{DD}T{HH}:00:00' :: TIMESTAMP
AND
updated_at < '{YYYY}-{MM}-{DD}T{HH}:00:00' :: TIMESTAMP + interval '1 hour'
)
SELECT * FROM delete_older_than_90_days
UNION ALL
SELECT * FROM transformation;

Then, use the Datacoral CLI to create the transformation:

datacoral organize matview-create --dpl-file mv_schema/mvname.dpl
note

historical-sync-query is currently used to figure out the schema of the destination table in order to create it and handle self-referential transformations. Going forward,

  • The sync query will be used to perform historical sync as soon as the transformation is added
  • There will be CLI commands to trigger a historical sync when needed

For now, please use the below query to insert the contents from input_schema.input_table onto the matview (mvname)

WITH historical_sync_query AS (
--- Same as @historical-sync-query
SELECT id,
org_id,
user_id,
updated_at,
NULL::BOOLEAN AS should_delete
FROM
input_schema.input_table
)
INSERT INTO mv_schema.mv_mvname
(
SELECT *
FROM historical_sync_query
);

When the transformation is refreshed, mv_schema.mv_mvname table in redshift will have only the past 90 days worth of data.