数据库 Bulk Upsert for MySQL & PostgreSQL

hooopo · 2017年03月03日 · 最后由 hooopo 回复于 2017年05月19日 · 9647 次阅读
本帖已被管理员设置为精华贴

什么是 Upsert

"UPSERT" is a DBMS feature that allows a DML statement's author to atomically either insert a row, or on the basis of the row already existing, UPDATE that existing row instead, while safely giving little to no further thought to concurrency. One of those two outcomes must be guaranteed, regardless of concurrent activity, which has been called "the essential property of UPSERT".

简而言之,就是,不存在就插入,存在就更新。

单记录 Upsert

MySQL 有 INSERT...ON DUPLICATE KEY UPDATE 语法,可以实现 Upsert:

INSERT INTO customers (id, first_name, last_name, email) VALUES (30797, 'hooopo1', 'wang', '[email protected]') 
ON DUPLICATE KEY UPDATE 
first_name = VALUES(first_name), last_name = VALUES(last_name);

PostgreSQL 从 9.5 也有了 INSERT ... ON CONFLICT UPDATE 语法,效果和 MySQL 类似:

INSERT INTO customers (id, first_name, last_name, email) VALUES (30797, 'hooopo1', 'wang', '[email protected]') 
ON CONFLICT(id) DO  UPDATE 
SET first_name = EXCLUDED.first_name, last_name = EXCLUDED.last_name;

批量 Upsert

之前研究 MySQL 里如何插入最快 ,里面提到 LOAD INFILE 方式批量插入,并且 MySQL 的 bulk insert 是支持 REPLACE 语意的,即批量插入的同时还可以 upsert

LOAD DATA LOCAL INFILE '/Users/hooopo/data/out/product_sales_facts.txt'
REPLACE INTO TABLE product_sale_facts FIELDS TERMINATED BY ',' (`id`,`date_id`,`order_id`,`product_id`,`address_id`,`unit_price`,`purchase_price`,`gross_profit`,`quantity`,`channel_id`,`gift`)

当然 PostgreSQL 也有 Copy 功能,和 MySQL 的 LOAD INFILE 类似。然而,copy 命令支持 Upsert,这使一些增量 ETL 的工作非常不方便。

不过有一种利用 staging 表的方式实现 bulk upsert,大致步骤如下:

一。目标表

二。把增量数据批量插入中间表

CREATE TABLE IF NOT EXISTS staging  LIKE customers INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING INDEXES;
COPY staging (id, email, first_name, last_name)
        FROM STDIN
          WITH
            DELIMITER ','
            NULL '\N'
            CSV;

三。把目标表中与 staging 表冲突部分删掉

DELETE FROM customers
USING staging
WHERE customers.id = staging.id

四。把 staging 表批量插入到目标表,因为冲突部分已经删掉,所以这步不会有任何冲突。

INSERT INTO customers (SELECT * FROM staging);

五。把 staging 表清空

TRUNCATE TABLE staging;

上面过程确实很麻烦,如果使用 kiba-plus 的话,只需要简单的 DSL:

destination Kiba::Plus::Destination::PgBulk2, { :connect_url => DEST_URL,
                                :table_name => "customers",
                                :truncate => false,
                                :columns => [:id, :email, :first_name, :last_name],
                                :incremental => true,
                                :unique_by => :id
                              }

相关链接:

刚好用过,推荐https://github.com/seamusabshere/upsert,PG9.4 好像不支持 upsert

huacnlee 将本帖设为了精华贴。 03月03日 11:02

已打赏 16 元 👍

炮哥出手,必是精品!

那就是手动实现 Upsert 咯

最后一个 INSERT 语句应该可以用 ON CONFLICT 吧,这样可以简化一点数据对比和删除的过程?

darkbaby123 回复

好像可以耶!但要 9.5+

hooopo 回复

是的。

用到了,真是不错呢

themadeknight 回复

哈!有用就好…

需要 登录 后方可回复, 如果你还没有账号请 注册新账号