Querying Source Tables using Athena

In the case of Athena itself being used as a warehouse, all source tables are also partitioned and hence the most natural way for source tables to be updated is in append mode.

When the extractmode is snapshot, Athena source tables will have the latest copy of the source loadunit in the latest partition of the table. A user will have to query the latest partition to get a consistent view of the data compared to the source.

When the extractmode is incremental-appends, the latest partition contains the newest rows fetched. A user will have to query the entire table across all partitions to get a consistent view of the data compared to the source. A good example of such a table is for events/logs/facts which are timestamped and new rows are generated all the time. Typically queries on this type of tables involves time window predicates which limits the number of partitions scanned in a given query.

When the extractmode is incremental-updates however, the latest partition of the source table only contains the changes within that timeperiod. So, there is no easy way for the user to get a consistent view of the data compared to source. They would have to run a complicated WINDOW ... PARTITIONED BY query across all partitions of the source table in order to get that consistent view. And as the number of partitions increases, this query would be scanning larger and larger number of partitions.

In this case, it is useful for the load step to create a new partition in the source table (hence appendmode for the partitioned source table) after applying the changes to the previous partition of the same source table. See the picture below for an illustration for each of the modes.

A query like below can be used in the materialized view (if the load step is not available)

WITH previous_partition AS (
SELECT primary_key, col2, col3, tscol
FROM mvtable
-- compute the previous partition and then use supplant logic
WHERE date_add('hour', -1, y||'-'||m||'-'||d||' '||h||':' || n ||':00')
= '{YYYY}-{MM}-{DD} {HH}:{NN}:00'
),
table_plus_changes AS (
SELECT primary_key, col2, col3, tscol, NULL as is_deleted
FROM previous_partition
UNION ALL
SELECT primary_key, col2, col3, tscol, is_deleted
FROM loadunittable
WHERE y = '{YYYY}' and m = '{MM}' and d = '{DD}' and h = '{HH}' and n = '{NN}'
),
withrownum as (
SELECT primary_key, col2, col3, tscol, is_deleted,
row_number() OVER (PARTITION BY primary_key ORDER BY tscol DESC) as rownum
FROM table_plus_changes)
SELECT primary_key, col2, col3, tscol
FROM withrownum
-- get the row with the largest tscol if the row is not deleted
WHERE rownum = 1 AND is_deleted IS NULL