开源项目 关于数据库自动生成 log 的问题

Awlter1 · 2020年12月24日 · 最后由 YoogoC 回复于 2021年01月04日 · 282 次阅读

请教各位数据库大佬一个问题

我们现在有这么一个需求:

把本地数据库的更新同步到 (写入) salesforce database

大概的逻辑就是 database tables 有 crud 的 log, 这个 log 会自动写入一个叫 trigger log 的 table, 比如 table_a 的 column_a 更新成了 value_a 然后 rails 定时去处理这个 trigger log, 把需要更新的 column 调接口去更新到 salesforce database

(这么做是因为:需要记录的是数据库更新的 change log,而不是整个 record,如果用整条 record, 没有更新的 column value 会 overwrite salesforce database)

问题: 我们数据库的 table 是动态的,并没有一一对应 rails 的 model,所以没法用类似这样基于 activerecord 的 gem(https://github.com/collectiveidea/audited) 去记录这些 crud log 那么想要实现这个 database log → trigger log 的逻辑,有什么比较好的解决办法吗?

其实大概就是实现 https://devcenter.heroku.com/articles/writing-data-to-salesforce-with-heroku-connect#understanding-the-trigger-log

我在 Heroku Connect 里面看到了

--
-- Name: hc_capture_insert_from_row(public.hstore, character varying, text[]); Type: FUNCTION; Schema: salesforce; Owner: postgres
--

CREATE FUNCTION salesforce.hc_capture_insert_from_row(source_row public.hstore, table_name character varying, excluded_cols text[] DEFAULT ARRAY[]::text[]) RETURNS integer
    LANGUAGE plpgsql
    AS $$
        DECLARE
            excluded_cols_standard text[] = ARRAY['_hc_lastop', '_hc_err']::text[];
            retval int;

        BEGIN
            -- VERSION 1 --

            IF (source_row -> 'id') IS NULL THEN
                -- source_row is required to have an int id value
                RETURN NULL;
            END IF;

            excluded_cols_standard := array_remove(
                array_remove(excluded_cols, 'id'), 'sfid') || excluded_cols_standard;
            INSERT INTO "salesforce"."_trigger_log" (
                action, table_name, txid, created_at, state, record_id, values)
            VALUES (
                'INSERT', table_name, txid_current(), clock_timestamp(), 'NEW',
                (source_row -> 'id')::int,
                source_row - excluded_cols_standard
            ) RETURNING id INTO retval;
            RETURN retval;
        END;
        $$;


ALTER FUNCTION salesforce.hc_capture_insert_from_row(source_row public.hstore, table_name character varying, excluded_cols text[]) OWNER TO postgres;

--
-- Name: hc_capture_update_from_row(public.hstore, character varying, text[]); Type: FUNCTION; Schema: salesforce; Owner: postgres
--

CREATE FUNCTION salesforce.hc_capture_update_from_row(source_row public.hstore, table_name character varying, columns_to_include text[] DEFAULT ARRAY[]::text[]) RETURNS integer
    LANGUAGE plpgsql
    AS $$
        DECLARE
            excluded_cols_standard text[] = ARRAY['_hc_lastop', '_hc_err']::text[];
            excluded_cols text[];
            retval int;

        BEGIN
            -- VERSION 1 --

            IF (source_row -> 'id') IS NULL THEN
                -- source_row is required to have an int id value
                RETURN NULL;
            END IF;

            IF array_length(columns_to_include, 1) <> 0 THEN
                excluded_cols := array(
                    select skeys(source_row)
                    except
                    select unnest(columns_to_include)
                );
            END IF;
            excluded_cols_standard := excluded_cols || excluded_cols_standard;
            INSERT INTO "salesforce"."_trigger_log" (
                action, table_name, txid, created_at, state, record_id, sfid, values, old)
            VALUES (
                'UPDATE', table_name, txid_current(), clock_timestamp(), 'NEW',
                (source_row -> 'id')::int, source_row -> 'sfid',
                source_row - excluded_cols_standard, NULL
            ) RETURNING id INTO retval;
            RETURN retval;
        END;
        $$;


ALTER FUNCTION salesforce.hc_capture_update_from_row(source_row public.hstore, table_name character varying, columns_to_include text[]) OWNER TO postgres;

但是这个 pg 的 function 是在 update failure 的时候手动写代码去 call 的,也不是自动 call 的 https://devcenter.heroku.com/articles/writing-data-to-salesforce-with-heroku-connect#update-failures

我猜想 pg 应该有 hook 或者 callback 之类的机制,在 insert or update 的时候去 call 一下上面相应的方法?

pg 的 wal_log 可能会解决你的问题,wal_level = logical,将 wal 级别设为 logical,这样外部可以解析更新数据,然后解析出来发给 kafka。

解析中间件:debezium,https://debezium.io/

我只是在测试环境下实验过,没用过生产。

个人感觉的坑点:1. wal_level 是否有性能影响,是否会影响主从流,2 debezium 能否 hold 住

需要 登录 后方可回复, 如果你还没有账号请 注册新账号