Getting the Difference Between Two Tables in Oracle DB

Dzeri, 18-07-2023, Programming

A way to calculate a row-level delta of two tables of the same shape in Oracle DB (and other databases with some tweaks).

Oracle DB, SQL

The Problem

Recently I was tasked with improving the performance of an ETL pipeline implemented mostly in Oracle DB. As we were uploading a lot of redundant data to an external system, incremental loads were the way to reduce this overhead. Essentially, the idea is to calculate the difference between the newest state of the data and the current state of the external system, and apply only those changes.

In a way, every relational DBMS system does this internally when executing transactions, and using the write-ahead log. User-facing implementations of a "delta" also exist, called change tracking or change data capture, and Oracle DB used to support this out-of-the-box up until about version 9, where it was made into a paid addon. Not only is this fact in itself frustrating, but it led me on a wild goose chase for a few days because a lot of online information on this topic has been outdated by that change. I really dread having to search for anything Oracle DB-related at this point, but that's getting off topic. We needed a way to implement incremental loads in the DB, without paying a Trillion $ per second per CPU, and it's not hard at all, as it shouldn't be...

The Solution

What I ended up with was a generic PL/SQL procedure that calculates the difference between two tables (new and old state), and returns rows with an appended column that signifies which action (delete, update or insert row) should be taken in order to make the old state identical to the new one.

As an example, let's say we have two tables, TAB1 and TAB2:

+-----------+  +-----------+
|    TAB1   |  |    TAB1   |
+----+------+  +----+------+
| ID | Text |  | ID | Text |
+----+------+  +----+------+
| 1  | A    |  | 2  | X    |
+----+------+  +----+------+
| 2  | B    |  | 3  | C    |
+----+------+  +----+------+
| 3  | C    |  | 4  | Y    |
+----+------+  +----+------+
The difference between these two should look like this:

+--------------------+
|        TAB1        |
+----+------+--------+
| ID | Text | STATUS |
+----+------+--------+
| 1  | A    | NEW    |
+----+------+--------+
| 2  | B    | UPD    |
+----+------+--------+
| 4  | Y    | DEL    |
+----+------+--------+

The Simple SQL-Only Version

Before I show the whole procedure though, I want to focus on the main SQL part that does the actual calculation of the differences. Even though the syntax in this article is Oracle DB-exclusive, it can definitely be made to work with other databases, for example by replacing the MINUS operator with EXCEPT. Anyway, this is how the raw, unparameterized SQL looks like:

WITH sq1 AS (
    SELECT * FROM TAB1 MINUS SELECT * FROM TAB2
),
sq2 AS (
    SELECT * FROM TAB2 MINUS SELECT * FROM TAB2
)
SELECT * FROM (
    SELECT sq2.*, 'DEL' AS "STATUS"
    FROM sq2
    UNION
    SELECT sq1.*, 'NEW' AS "STATUS"
    FROM sq1
)
ORDER BY STATUS
Basically, this will give you rows with either DEL or NEW in the STATUS column, which you will need to apply (delete, insert rows) to table TAB2 in order to make it identical with table TAB1. It doesn't show the UPD (update rows) status, but we'll get to that later. That's basically it in its purest form, now let's see the fully fleshed-out procedure:

The Full PL/SQL Procedure

This probably looks way more complicated than it is, because of the many safety checks I built in in order to make SQL injection harder. Also, the code is basically duplicated for both tables. This could definitely be extracted into separate functions. I will completely skip over the safety checks and focus on other interesting parts of the code, but if you're in a rush and want to deploy the code as-is, you can have some peace of mind, knowing that I invested some time into making it secure. This does not mean, however, that the code below is bulletproof.

/**
    Calculates the differences between two tables with the same structure (including PKs).
    @param tab1_schema schema of the left table. If null, the current user's schema is assumed.
    @param tab1_name the name of the table on the left.
    @param tab2_schema schema of the right table. If null, the current user's schema is assumed.
    @param tab1_name the name of the table on the right.
    @param show_upd_bool determines if UPD rows should be returned, or if all changes should be represented as
                          DEL and NEW operations.
                          If this is the case, the DEL operations are always sorted first.
                          Takes values 1 (true) and 0 (false)

    @param delta OUT A SYS_REFCURSOR to a virtual table with the same structure as tab1 and tab2, but
                     with an added field "STATUS" with the values "NEW", "UPD" or "DEL".
                     Applying these rows to the right table according to the status (upsert/delete)
                     will make its state identical with the left table.

    @throws Error with code -20123 if one of the tables is not valid or does not exist.
    @throws Other Oracle validation errors related to object names.
 */
PROCEDURE calc_delta (
    tab1_schema VARCHAR2,
    tab1_name VARCHAR2,
    tab2_schema VARCHAR2,
    tab2_name VARCHAR2,
    show_upd_bool NUMBER,
    delta OUT SYS_REFCURSOR
)
IS
    TYPE varchartab IS TABLE OF VARCHAR2(5000);
    key_cols varchartab;
    comma_sep_key_cols VARCHAR2(500);
    dyn_query_str VARCHAR2(1200);
    tab1_schema_actual VARCHAR(128);
    tab2_schema_actual VARCHAR(128);
    tab1_schema_safe VARCHAR(128);
    tab2_schema_safe VARCHAR(128);
    tab1_full VARCHAR(258);
    tab2_full VARCHAR(258);
BEGIN
    -- Quote schema variable or get current schema for table 1
    IF tab1_schema IS NULL THEN
        select sys_context('userenv','current_schema') INTO tab1_schema_actual from dual;
    ELSE
        tab1_schema_actual := tab1_schema;
    END IF;
    tab1_schema_safe := DBMS_ASSERT.ENQUOTE_NAME(tab1_schema_actual);

    -- Quote schema variable or get current schema for table 2
    IF tab2_schema IS NULL THEN
        select sys_context('userenv','current_schema') INTO tab2_schema_actual from dual;
    ELSE
        tab2_schema_actual := tab2_schema;
    END IF;
    tab2_schema_safe := DBMS_ASSERT.ENQUOTE_NAME(tab2_schema_actual);

    tab1_full := tab1_schema_safe || '.' || DBMS_ASSERT.ENQUOTE_NAME(tab1_name);
    tab2_full := tab2_schema_safe || '.' || DBMS_ASSERT.ENQUOTE_NAME(tab2_name);

    SELECT column_name BULK COLLECT INTO key_cols
    FROM all_cons_columns
    WHERE constraint_name = (
      SELECT constraint_name
      FROM user_constraints uc
      WHERE UPPER(uc.table_name) = UPPER(tab1_name)
        AND UPPER(uc.owner) = UPPER(tab1_schema_actual)
        AND uc.CONSTRAINT_TYPE = 'P'
    );

    IF key_cols.count = 0 THEN
        raise_application_error(-20123, 'Invalid Table 1!');
    END IF;

    FOR idx IN 1 .. key_cols.count LOOP
        comma_sep_key_cols := comma_sep_key_cols || DBMS_ASSERT.ENQUOTE_NAME(key_cols(idx));
        IF idx != key_cols.count THEN
            comma_sep_key_cols :=  comma_sep_key_cols || ', ';
        END IF;
    END LOOP;

    SELECT column_name BULK COLLECT INTO key_cols
    FROM all_cons_columns
    WHERE constraint_name = (
      SELECT constraint_name
      FROM user_constraints uc
      WHERE UPPER(uc.table_name) = UPPER(tab2_name)
        AND UPPER(uc.owner) = UPPER(tab2_schema_actual)
        AND uc.CONSTRAINT_TYPE = 'P'
    );

    IF key_cols.count = 0 THEN
        raise_application_error(-20123, 'Invalid Table 2!');
    END IF;

    dyn_query_str:= '';

    IF show_upd_bool = 1 THEN
        dyn_query_str :=
        'WITH sq1 AS (
            SELECT * FROM '||tab1_full||' MINUS SELECT * FROM '||tab2_full||'
        ),
        sq2 AS (
            SELECT * FROM '||tab2_full||' MINUS SELECT * FROM '||tab1_full||'
        )
        SELECT sq1.*, ''NEW'' AS "STATUS"
        FROM sq1
        WHERE ('||comma_sep_key_cols||') NOT IN (SELECT '||comma_sep_key_cols||' FROM sq2)
        UNION
        SELECT sq2.*, ''DEL'' AS "STATUS"
        FROM sq2
        WHERE ('||comma_sep_key_cols||') NOT IN (SELECT '||comma_sep_key_cols||' FROM sq1)
        UNION
        SELECT sq1.*, ''UPD'' AS "STATUS"
        FROM sq1
        WHERE ('||comma_sep_key_cols||') IN (SELECT '||comma_sep_key_cols||' FROM sq2)';
    ELSE
        dyn_query_str :=
        'WITH sq1 AS (
            SELECT * FROM '||tab1_full||' MINUS SELECT * FROM '||tab2_full||'
        ),
        sq2 AS (
            SELECT * FROM '||tab2_full||' MINUS SELECT * FROM '||tab1_full||'
        )
        SELECT * FROM (
            SELECT sq2.*, ''DEL'' AS "STATUS"
            FROM sq2
            UNION
            SELECT sq1.*, ''NEW'' AS "STATUS"
            FROM sq1
        )
        ORDER BY STATUS';
    END IF;

    OPEN delta FOR dyn_query_str;

END;

Code Breakdown

Starting from the bottom, you can probably notice one large if-branch, that differentiates which dynamic query should be used, based on the show_upd_bool parameter. As I've shown the boring false path above, I'll just focus on the query that produces UPD rows in addition to DEL and NEW.

The question you might have at this point is "What defines a row update exactly?". For me, it's a change to any of the non-primary key columns, provided that a row with the same set of PK columns exists in both tables. In order to programmatically extract the names of key columns for a table, we query the Oracle-specific all_cons_columns table and save the result into key_cols:

SELECT column_name BULK COLLECT INTO key_cols
FROM all_cons_columns
WHERE constraint_name = (
SELECT constraint_name
FROM user_constraints uc
WHERE UPPER(uc.table_name) = UPPER(tab2_name)
    AND UPPER(uc.owner) = UPPER(tab2_schema_actual)
    AND uc.CONSTRAINT_TYPE = 'P'
);

After checking the validity of the result, a string of comma-separated PK column names is generated, which will later be injected into the dynamic SQL query:

FOR idx IN 1 .. key_cols.count LOOP
    comma_sep_key_cols := comma_sep_key_cols || DBMS_ASSERT.ENQUOTE_NAME(key_cols(idx));
    IF idx != key_cols.count THEN
        comma_sep_key_cols :=  comma_sep_key_cols || ', ';
    END IF;
END LOOP;

We use this knowledge of PK columns to filter out rows with the same primary keys in both tables when creating NEW and DEL rows, and excluding everything else when creating UPD rows:

'WITH sq1 AS (
    SELECT * FROM '||tab1_full||' MINUS SELECT * FROM '||tab2_full||'
),
sq2 AS (
    SELECT * FROM '||tab2_full||' MINUS SELECT * FROM '||tab1_full||'
)
SELECT sq1.*, ''NEW'' AS "STATUS"
FROM sq1
WHERE ('||comma_sep_key_cols||') NOT IN (SELECT '||comma_sep_key_cols||' FROM sq2)
UNION
SELECT sq2.*, ''DEL'' AS "STATUS"
FROM sq2
WHERE ('||comma_sep_key_cols||') NOT IN (SELECT '||comma_sep_key_cols||' FROM sq1)
UNION
SELECT sq1.*, ''UPD'' AS "STATUS"
FROM sq1
WHERE ('||comma_sep_key_cols||') IN (SELECT '||comma_sep_key_cols||' FROM sq2)'

And that's pretty much it. This procedure returns a cursor via an OUT parameter, which is used to read the result.

Calling the code

DBeaver

If you're using DBeaver to debug your DB interactions like most people, I don't know if you can call the procedure directly. What does work, even if slightly annoying, is to wrap the procedure call in a function:

FUNCTION calc_delta (
    tab1_schema VARCHAR2,
    tab1_name VARCHAR2,
    tab2_schema VARCHAR2,
    tab2_name VARCHAR2,
    show_upd_bool NUMBER
) RETURN SYS_REFCURSOR
IS
    delta SYS_REFCURSOR;
BEGIN
    calc_delta(tab1_schema, tab1_name, tab2_schema, tab2_name, show_upd_bool, delta);
    RETURN delta;
END;

And then call it from the DBeaver console or SQL file:

SELECT calc_delta(null, 'TAB1', null, 'TAB2', 1) from DUAL;
DBeaver will complain that a cursor has to stay open, and you're going to have to open the "values" tab to see the result.

Java

For a full guide, refer to this blog post.

This is what worked for me:

// Ommited code
var query = entityManager.createStoredProcedureQuery(
            "calc_delta",
            deltaClassEntity
        )
        .registerStoredProcedureParameter("tab1_schema", String.class, ParameterMode.IN)
        .registerStoredProcedureParameter("tab1_name", String.class, ParameterMode.IN)
        .registerStoredProcedureParameter("tab2_schema", String.class, ParameterMode.IN)
        .registerStoredProcedureParameter("tab2_name", String.class, ParameterMode.IN)
        .registerStoredProcedureParameter("show_upd_bool", Integer.class, ParameterMode.IN)
        .registerStoredProcedureParameter("rgm", String.class, ParameterMode.IN)
        .registerStoredProcedureParameter("delta", Class.class, ParameterMode.REF_CURSOR)
        .setParameter("tab1_schema", null)
        .setParameter("tab1_name", sourceTableName)
        .setParameter("tab2_schema", null)
        .setParameter("tab2_name", stateTableName)
        .setParameter("rgm", rgmParamValue)
        .setParameter("show_upd_bool", showUpdates ? 1 : 0);

return query.getResultList();