用戶可以通過 MySQL 協(xié)議,使用 INSERT 語句進(jìn)行數(shù)據(jù)導(dǎo)入INSERT 語句的使用方式和 MySQL 等數(shù)據(jù)庫中 INSERT 語句的使用方式類似。 INSERT 語句支持以下兩種語法:
INSERT INTO table SELECT ...INSERT INTO table VALUES(...)
對(duì)于 Doris 來說,一個(gè) INSERT 命令就是一個(gè)完整的導(dǎo)入事務(wù)。
因此不論是導(dǎo)入一條數(shù)據(jù),還是多條數(shù)據(jù),我們都不建議在生產(chǎn)環(huán)境使用這種方式進(jìn)行數(shù)據(jù)導(dǎo)入。高頻次的 INSERT 操作會(huì)導(dǎo)致在存儲(chǔ)層產(chǎn)生大量的小文件,會(huì)嚴(yán)重影響系統(tǒng)性能。
(相關(guān)資料圖)
該方式僅用于線下簡單測試或低頻少量的操作?;蛘呖梢允褂靡韵路绞竭M(jìn)行批量的插入操作:
INSERT INTO example_tbl VALUES(1000, "baidu1", 3.25)(2000, "baidu2", 4.25)(3000, "baidu3", 5.25);
Stream Load用于將本地文件導(dǎo)入到doris中。Stream Load 是通過 HTTP 協(xié)議與 Doris 進(jìn)行連接交互的。該方式中涉及 HOST:PORT 都是對(duì)應(yīng)的HTTP 協(xié)議端口。?BE 的 HTTP 協(xié)議端口,默認(rèn)為 8040。?FE 的 HTTP 協(xié)議端口,默認(rèn)為 8030。但須保證客戶端所在機(jī)器網(wǎng)絡(luò)能夠聯(lián)通FE, BE 所在機(jī)器。
-- 創(chuàng)建表drop table if exists load_local_file_test;CREATE TABLE IF NOT EXISTS load_local_file_test( id INT, name VARCHAR(50), age TINYINT)unique key(id)DISTRIBUTED BY HASH(id) BUCKETS 3;
# 創(chuàng)建文件1,zss,282,lss,283,ww,88# 導(dǎo)入數(shù)據(jù)## 語法示例 curl \ -u user:passwd \ # 賬號(hào)密碼 -H "label:load_local_file_test" \ # 本次任務(wù)的唯一標(biāo)識(shí) -T 文件地址 \ http://主機(jī)名:端口號(hào)/api/庫名/表名/_stream_load# user:passwd 為在 Doris 中創(chuàng)建的用戶。初始用戶為 admin / root,密碼初始狀態(tài)下為空。# host:port 為 BE 的 HTTP 協(xié)議端口,默認(rèn)是 8040,可以在 Doris 集群 WEB UI頁面查看。# label: 可以在 Header 中指定 Label 唯一標(biāo)識(shí)這個(gè)導(dǎo)入任務(wù)。curl \ -u root:123 \ -H "label:load_local_file" \ -H "column_separator:," \ -T /root/data/loadfile.txt \http://doitedu01:8040/api/test/load_local_file_test/_stream_load
curl的一些可配置的參數(shù)label: 導(dǎo)入任務(wù)的標(biāo)簽,相同標(biāo)簽的數(shù)據(jù)無法多次導(dǎo)入。(標(biāo)簽?zāi)J(rèn)保留30分鐘)column_separator:用于指定導(dǎo)入文件中的列分隔符,默認(rèn)為\t。line_delimiter:用于指定導(dǎo)入文件中的換行符,默認(rèn)為\n。columns:用于指定文件中的列和table中列的對(duì)應(yīng)關(guān)系,默認(rèn)一一對(duì)應(yīng)where: 用來過濾導(dǎo)入文件中的數(shù)據(jù)max_filter_ratio:最大容忍可過濾(數(shù)據(jù)不規(guī)范等原因)的數(shù)據(jù)比例。默認(rèn)零容忍。數(shù)據(jù)不規(guī)范不包括通過 where 條件過濾掉的行。partitions: 用于指定這次導(dǎo)入所設(shè)計(jì)的partition。如果用戶能夠確定數(shù)據(jù)對(duì)應(yīng)的partition,推薦指定該項(xiàng)。不滿足這些分區(qū)的數(shù)據(jù)將被過濾掉。timeout: 指定導(dǎo)入的超時(shí)時(shí)間。單位秒。默認(rèn)是 600 秒。可設(shè)置范圍為 1 秒 ~ 259200 秒。timezone: 指定本次導(dǎo)入所使用的時(shí)區(qū)。默認(rèn)為東八區(qū)。該參數(shù)會(huì)影響所有導(dǎo)入涉及的和時(shí)區(qū)有關(guān)的函數(shù)結(jié)果。exec_mem_limit: 導(dǎo)入內(nèi)存限制。默認(rèn)為 2GB。單位為字節(jié)。format: 指定導(dǎo)入數(shù)據(jù)格式,默認(rèn)是csv,支持json格式。read_json_by_line: 布爾類型,為true表示支持每行讀取一個(gè)json對(duì)象,默認(rèn)值為false。merge_type: 數(shù)據(jù)的合并類型,一共支持三種類型APPEND、DELETE、MERGE 其中,APPEND是默認(rèn)值,表示這批數(shù)據(jù)全部需要追加到現(xiàn)有數(shù)據(jù)中,DELETE 表示刪除與這批數(shù)據(jù)key相同的所有行,MERGE 語義 需要與delete 條件聯(lián)合使用,表示滿足delete 條件的數(shù)據(jù)按照DELETE 語義處理其余的按照APPEND 語義處理, 示例:-H "merge_type: MERGE" -H "delete: flag=1"delete: 僅在 MERGE下有意義, 表示數(shù)據(jù)的刪除條件 function_column.sequence_col: 只適用于UNIQUE_KEYS,相同key列下,保證value列按照source_sequence列進(jìn)行REPLACE, source_sequence可以是數(shù)據(jù)源中的列,也可以是表結(jié)構(gòu)中的一列。導(dǎo)入json數(shù)據(jù)建議一個(gè)導(dǎo)入請(qǐng)求的數(shù)據(jù)量控制在 1 - 2 GB 以內(nèi)。如果有大量本地文件,可以分批并發(fā)提交。
# 準(zhǔn)備數(shù)據(jù){"id":1,"name":"liuyan","age":18}{"id":2,"name":"tangyan","age":18}{"id":3,"name":"jinlian","age":18}{"id":4,"name":"dalang","age":18}{"id":5,"name":"qingqing","age":18}curl \ -u root: \ -H "label:load_local_file_json_20221126" \ -H "columns:id,name,age" \ -H "max_filter_ratio:0.1" \ -H "timeout:1000" \ -H "exec_mem_limit:1G" \ -H "where:id>1" \ -H "format:json" \ -H "read_json_by_line:true" \ -H "merge_type:delete" \ -T /root/data/json.txt \http://doitedu01:8040/api/test/load_local_file_test/_stream_load -H "merge_type:append" \ # 會(huì)把id = 3 的這條數(shù)據(jù)刪除 -H "merge_type:MERGE" \ -H "delete:id=3"
外部存儲(chǔ)數(shù)據(jù)導(dǎo)入(hdfs)適用場景?源數(shù)據(jù)在 Broker 可以訪問的存儲(chǔ)系統(tǒng)中,如 HDFS。?數(shù)據(jù)量在幾十到百 GB 級(jí)別。
基本原理創(chuàng)建提交導(dǎo)入的任務(wù)FE生成執(zhí)行計(jì)劃并將執(zhí)行計(jì)劃分發(fā)到多個(gè)BE節(jié)點(diǎn)上(每個(gè)BE節(jié)點(diǎn)都導(dǎo)入一部分?jǐn)?shù)據(jù))BE收到執(zhí)行計(jì)劃后開始執(zhí)行,從broker上拉取數(shù)據(jù)到自己的節(jié)點(diǎn)上所有BE都完成后,F(xiàn)E決定是否導(dǎo)入成功,返回結(jié)果給客戶端-- 新建一張表drop table if exists load_hdfs_file_test1;CREATE TABLE IF NOT EXISTS load_hdfs_file_test1( id INT, name VARCHAR(50), age TINYINT)unique key(id)DISTRIBUTED BY HASH(id) BUCKETS 3;
將本地的數(shù)據(jù)導(dǎo)入到hdfs上面hdfs dfs -put ./loadfile.txt hdfs://linux01:8020/hdfs dfs -ls hdfs://linux01:8020/
-- 導(dǎo)入語法LOAD LABEL test.label_202204([MERGE|APPEND|DELETE] -- 不寫就是appendDATA INFILE("file_path1"[, file_path2, ...] -- 描述數(shù)據(jù)的路徑 這邊可以寫多個(gè) ,以逗號(hào)分割)[NEGATIVE] -- 負(fù)增長INTO TABLE `table_name` -- 導(dǎo)入的表名字[PARTITION (p1, p2, ...)] -- 導(dǎo)入到哪些分區(qū),不符合這些分區(qū)的就會(huì)被過濾掉[COLUMNS TERMINATED BY "column_separator"] -- 指定分隔符[FORMAT AS "file_type"] -- 指定存儲(chǔ)的文件類型[(column_list)] -- 指定導(dǎo)入哪些列 [COLUMNS FROM PATH AS (c1, c2, ...)] -- 從路勁中抽取的部分列[SET (column_mapping)] -- 對(duì)于列可以做一些映射,寫一些函數(shù)-- 這個(gè)參數(shù)要寫在要寫在set的后面[PRECEDING FILTER predicate] -- 在mapping前做過濾做一些過濾[WHERE predicate] -- 在mapping后做一些過濾 比如id>10 [DELETE ON expr] --根據(jù)字段去做一些抵消消除的策略 需要配合MERGE[ORDER BY source_sequence] -- 導(dǎo)入數(shù)據(jù)的時(shí)候保證數(shù)據(jù)順序[PROPERTIES ("key1"="value1", ...)] -- 一些配置參數(shù)
-- 將hdfs上的數(shù)據(jù)load到表中LOAD LABEL test.label_20221125(DATA INFILE("hdfs://linux01:8020/test.txt")INTO TABLE `load_hdfs_file_test`COLUMNS TERMINATED BY ","(id,name,age))with HDFS ("fs.defaultFS"="hdfs://linux01:8020","hadoop.username"="root")PROPERTIES("timeout"="1200","max_filter_ratio"="0.1");-- 這是一個(gè)異步的操作,所以需要去查看下執(zhí)行的狀態(tài)show load order by createtime desc limit 1\G;
從 HDFS 導(dǎo)入數(shù)據(jù),使用通配符匹配兩批兩批文件。分別導(dǎo)入到兩個(gè)表中
LOAD LABEL example_db.label2( DATA INFILE("hdfs://linux01:8020/input/file-10*") INTO TABLE `my_table1` PARTITION (p1) COLUMNS TERMINATED BY "," FORMAT AS "parquet" (id, tmp_salary, tmp_score) SET ( salary= tmp_salary + 1000, score = tmp_score + 10 ), DATA INFILE("hdfs://linux01:8020/input/file-20*") INTO TABLE `my_table2` COLUMNS TERMINATED BY "," (k1, k2, k3))with HDFS ("fs.defaultFS"="hdfs://linux01:8020","hadoop.username"="root")-- 導(dǎo)入數(shù)據(jù),并提取文件路徑中的分區(qū)字段LOAD LABEL example_db.label10( DATA INFILE("hdfs://linux01:8020/user/hive/warehouse/table_name/dt=20221125/*") INTO TABLE `my_table` FORMAT AS "csv" (k1, k2, k3) COLUMNS FROM PATH AS (dt))WITH BROKER hdfs( "username"="root", "password"="123");-- 對(duì)待導(dǎo)入數(shù)據(jù)進(jìn)行過濾。LOAD LABEL example_db.label6( DATA INFILE("hdfs://linux01:8020/input/file") INTO TABLE `my_table` (k1, k2, k3) SET ( k2 = k2 + 1 ) PRECEDING FILTER k1 = 1 ==》前置過濾 WHERE k1 > k2 ==》 后置過濾)WITH BROKER hdfs( "username"="root", "password"="123");-- 只有原始數(shù)據(jù)中,k1 = 1,并且轉(zhuǎn)換后,k1 > k2 的行才會(huì)被導(dǎo)入。
取消導(dǎo)入任務(wù)當(dāng) Broker load 作業(yè)狀態(tài)不為 CANCELLED 或 FINISHED 時(shí),可以被用戶手動(dòng)取消。取消時(shí)需要指定待取消導(dǎo)入任務(wù)的 Label 。取消導(dǎo)入命令語法可執(zhí)行 HELP CANCEL LOAD 查看。
CANCEL LOAD [FROM db_name] WHERE LABEL="load_label";
通過外部表同步數(shù)據(jù)Doris 可以創(chuàng)建外部表。創(chuàng)建完成后,可以通過 SELECT 語句直接查詢外部表的數(shù)據(jù),也可以通過 INSERT INTO SELECT 的方式導(dǎo)入外部表的數(shù)據(jù)。
Doris 外部表目前支持的數(shù)據(jù)源包括:MySQL,Oracle,Hive,PostgreSQL,SQLServer,Iceberg,ElasticSearch
-- 整體語法CREATE [EXTERNAL] TABLE table_name ( col_name col_type [NULL | NOT NULL] [COMMENT "comment"] ) ENGINE=HIVE[COMMENT "comment"]PROPERTIES (-- 我要映射的hive表在哪個(gè)庫里面-- 映射的表名是哪一張-- hive的元數(shù)據(jù)服務(wù)地址 "property_name"="property_value", ...);-- 參數(shù)說明:-- 1.外表列 -- 列名要與 Hive 表一一對(duì)應(yīng)-- 列的順序需要與 Hive 表一致-- 必須包含 Hive 表中的全部列-- Hive 表分區(qū)列無需指定,與普通列一樣定義即可。 -- 2.ENGINE 需要指定為 HIVE -- 3.PROPERTIES 屬性: -- hive.metastore.uris:Hive Metastore 服務(wù)地址 -- database:掛載 Hive 對(duì)應(yīng)的數(shù)據(jù)庫名 -- table:掛載 Hive 對(duì)應(yīng)的表名
完成在 Doris 中建立 Hive 外表后,除了無法使用 Doris 中的數(shù)據(jù)模型(rollup、預(yù)聚合、物化視圖等)外,與普通的 Doris OLAP 表并無區(qū)別
-- 在Hive 中創(chuàng)建一個(gè)測試用表:CREATE TABLE `user_info` ( `id` int, `name` string, `age` int) stored as orc;insert into user_info values (1,"zss",18);insert into user_info values (2,"lss",20);insert into user_info values (3,"ww",25);-- Doris 中創(chuàng)建外部表CREATE EXTERNAL TABLE `hive_user_info` ( `id` int, `name` varchar(10), `age` int ) ENGINE=HIVE PROPERTIES ( "hive.metastore.uris" = "thrift://linux01:9083", "database" = "db1", "table" = "user_info" );
外部表創(chuàng)建好后,就可以直接在doris中對(duì)這個(gè)外部表進(jìn)行查詢了直接查詢外部表,無法利用到doris自身的各種查詢優(yōu)化機(jī)制!
select * from hive_user_info;-- 將數(shù)據(jù)從外部表導(dǎo)入內(nèi)部表-- 數(shù)據(jù)從外部表導(dǎo)入內(nèi)部表后,就可以利用doris自身的查詢優(yōu)勢了!-- 假設(shè)要導(dǎo)入的目標(biāo)內(nèi)部表為: doris_user_info (需要提前創(chuàng)建)CREATE TABLE IF NOT EXISTS doris_user_info( id INT, name VARCHAR(50), age TINYINT)unique key(id)DISTRIBUTED BY HASH(id) BUCKETS 3;-- 就是用sql查詢,從外部表中select出數(shù)據(jù)后,insert到內(nèi)部表即可insert into doris_user_infoselect *from hive_user_info;
Binlog Load注意:Hive 表 Schema 變更不會(huì)自動(dòng)同步,需要在 Doris 中重建 Hive 外表。當(dāng)前 Hive 的存儲(chǔ)格式僅支持 Text,Parquet 和 ORC 類型
Binlog Load提供了一種使Doris增量同步用戶在Mysql數(shù)據(jù)庫中對(duì)數(shù)據(jù)更新操作的CDC(Change Data Capture)功能。
基本原理當(dāng)前版本設(shè)計(jì)中,Binlog Load需要依賴canal作為中間媒介,讓canal偽造成一個(gè)從節(jié)點(diǎn)去獲取Mysql主節(jié)點(diǎn)上的Binlog并解析,再由Doris去獲取Canal上解析好的數(shù)據(jù),主要涉及Mysql端、Canal端以及Doris端
FE會(huì)為每個(gè)數(shù)據(jù)同步作業(yè)啟動(dòng)一個(gè)canal client,來向canal server端訂閱并獲取數(shù)據(jù)。client中的receiver將負(fù)責(zé)通過Get命令接收數(shù)據(jù),每獲取到一個(gè)數(shù)據(jù)batch,都會(huì)由consumer根據(jù)對(duì)應(yīng)表分發(fā)到不同的channel,每個(gè)channel都會(huì)為此數(shù)據(jù)batch產(chǎn)生一個(gè)發(fā)送數(shù)據(jù)的子任務(wù)Task。在FE上,一個(gè)Task是channel向BE發(fā)送數(shù)據(jù)的子任務(wù),里面包含分發(fā)到當(dāng)前channel的同一個(gè)batch的數(shù)據(jù)。channel控制著單個(gè)表事務(wù)的開始、提交、終止。一個(gè)事務(wù)周期內(nèi),一般會(huì)從consumer獲取到多個(gè)batch的數(shù)據(jù),因此會(huì)產(chǎn)生多個(gè)向BE發(fā)送數(shù)據(jù)的子任務(wù)Task,在提交事務(wù)成功前,這些Task不會(huì)實(shí)際生效。滿足一定條件時(shí)(比如超過一定時(shí)間、達(dá)到提交最大數(shù)據(jù)大?。?,consumer將會(huì)阻塞并通知各個(gè)channel提交事務(wù)。當(dāng)且僅當(dāng)所有channel都提交成功,才會(huì)通過Ack命令通知canal并繼續(xù)獲取并消費(fèi)數(shù)據(jù)。如果有任意channel提交失敗,將會(huì)重新從上一次消費(fèi)成功的位置獲取數(shù)據(jù)并再次提交(已提交成功的channel不會(huì)再次提交以保證冪等性)。整個(gè)數(shù)據(jù)同步作業(yè)中,F(xiàn)E通過以上流程不斷的從canal獲取數(shù)據(jù)并提交到BE,來完成數(shù)據(jù)同步。Mysql端在Mysql Cluster模式的主從同步中,二進(jìn)制日志文件(Binlog)記錄了主節(jié)點(diǎn)上的所有數(shù)據(jù)變化,數(shù)據(jù)在Cluster的多個(gè)節(jié)點(diǎn)間同步、備份都要通過Binlog日志進(jìn)行,從而提高集群的可用性。架構(gòu)通常由一個(gè)主節(jié)點(diǎn)(負(fù)責(zé)寫)和一個(gè)或多個(gè)從節(jié)點(diǎn)(負(fù)責(zé)讀)構(gòu)成,所有在主節(jié)點(diǎn)上發(fā)生的數(shù)據(jù)變更將會(huì)復(fù)制給從節(jié)點(diǎn)。注意:目前必須要使用Mysql 5.7及以上的版本才能支持Binlog Load功能。
# 打開mysql的二進(jìn)制binlog日志功能,則需要編輯my.cnf配置文件設(shè)置一下。find / -name my.cnf/etc/my.cnf
# 修改mysqld中的一些配置文件[mysqld]server_id = 1log-bin = mysql-binbinlog-format = ROW#binlog-format 的三種模式#ROW 記錄每一行數(shù)據(jù)的信息#Statement 記錄sql語句#Mixed 上面兩種的混合# 重啟 MySQL 使配置生效systemctl restart mysqld
-- 創(chuàng)建用戶并授權(quán)-- 設(shè)置這些參數(shù)可以使得mysql的密碼簡單化set global validate_password_length=4; set global validate_password_policy=0; -- 新增一個(gè)canal的用戶,讓他監(jiān)聽所有庫中的所有表,并且設(shè)置密碼為canalGRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "canal"@"%" IDENTIFIED BY "canal" ;-- 刷新一下權(quán)限FLUSH PRIVILEGES;-- 準(zhǔn)備測試表CREATE TABLE `user_doris2` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(11) DEFAULT NULL, `gender` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
配置 Canal 端Canal 是屬于阿里巴巴 otter 項(xiàng)目下的一個(gè)子項(xiàng)目,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi),用于解決跨機(jī)房同步的業(yè)務(wù)場景,建議使用 canal 1.1.5及以上版本。
下載地址:https://github.com/alibaba/canal/releases
# 上傳并解壓 canal deployer壓縮包mkdir /opt/apps/canaltar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/apps/canal# 在 conf 文件夾下新建目錄并重命名 # 一個(gè) canal 服務(wù)中可以有多個(gè) instance,conf/下的每一個(gè)目錄即是一個(gè)實(shí)例,每個(gè)實(shí)例下面都有獨(dú)立的配置文件mkdir /opt/apps/canel/conf/doris# 拷貝配置文件模板 cp /opt/apps/canal/conf/example/instance.properties /opt/apps/canal/conf/doris/# 修改 conf/canal.properties 的配置vi canal.properties# 進(jìn)入找到canal.destinations = example# 將其修改為 我們自己配置的目錄canal.destinations = doris# 修改 instance 配置文件vi instance.properties # 修改:canal.instance.master.address=doitedu01:3306# 啟動(dòng)sh bin/startup.sh
配置目標(biāo)表注意:canal client 和 canal instance 是一一對(duì)應(yīng)的,Binlog Load 已限制多個(gè)數(shù)據(jù)同步作 業(yè)不能連接到同一個(gè) destination。
基本語法:CREATE SYNC [db.]job_name( channel_desc, channel_desc ...)binlog_desc-- 參數(shù)說明:-- job_name:是數(shù)據(jù)同步作業(yè)在當(dāng)前數(shù)據(jù)庫內(nèi)的唯一標(biāo)識(shí)-- channel_desc :用來定義任務(wù)下的數(shù)據(jù)通道,可表示 MySQL 源表到 doris 目標(biāo)表的映射關(guān)系。在設(shè)置此項(xiàng)時(shí),如果存在多個(gè)映射關(guān)系,必須滿足 MySQL 源表應(yīng)該與 doris 目標(biāo)表是一一對(duì)應(yīng)關(guān)系,其他的任何映射關(guān)系(如一對(duì)多關(guān)系),檢查語法時(shí)都被視為不合法。-- column_mapping:主要指MySQL源表和doris目標(biāo)表的列之間的映射關(guān)系,如果不指定,F(xiàn)E 會(huì)默認(rèn)源表和目標(biāo)表的列按順序一一對(duì)應(yīng)。但是我們依然建議顯式的指定列的映射關(guān)系,這樣當(dāng)目標(biāo)表的結(jié)構(gòu)發(fā)生變化(比如增加一個(gè) nullable 的列),數(shù)據(jù)同步作業(yè)依然可以進(jìn)行。否則,當(dāng)發(fā)生上述變動(dòng)后,因?yàn)榱杏成潢P(guān)系不再一一對(duì)應(yīng),導(dǎo)入將報(bào)錯(cuò)。 -- binlog_desc:定義了對(duì)接遠(yuǎn)端 Binlog 地址的一些必要信息,目前可支持的對(duì)接類型只有 canal 方式,所有的配置項(xiàng)前都需要加上 canal 前綴。-- canal.server.ip: canal server 的地址 -- canal.server.port: canal server 的端口 -- canal.destination: 前文提到的 instance 的字符串標(biāo)識(shí) -- canal.batchSize: 每批從 canal server 處獲取的 batch 大小的最大值,默認(rèn) 8192 -- canal.username: instance 的用戶名 -- canal.password: instance 的密碼 -- canal.debug: 設(shè)置為 true 時(shí),會(huì)將 batch 和每一行數(shù)據(jù)的詳細(xì)信息都打印出來,會(huì)影響性能。-- Doris 創(chuàng)建與 Mysql 對(duì)應(yīng)的目標(biāo)表CREATE TABLE `binlog_mysql` ( `id` int(11) NOT NULL COMMENT "", `name` VARCHAR(50) NOT NULL COMMENT "", `age` int(11) NOT NULL COMMENT "" , `gender` VARCHAR(50) NOT NULL COMMENT "") ENGINE=OLAP UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1; CREATE SYNC test.job20221228( FROM test.binlog_test INTO binlog_test)FROM BINLOG ( "type" = "canal", "canal.server.ip" = "linux01", "canal.server.port" = "11111", "canal.destination" = "doris", "canal.username" = "canal", "canal.password" = "canal");-- 查看作業(yè)狀態(tài)-- 展示當(dāng)前數(shù)據(jù)庫的所有數(shù)據(jù)同步作業(yè)狀態(tài)。 SHOW SYNC JOB; -- 展示數(shù)據(jù)庫 `test_db` 下的所有數(shù)據(jù)同步作業(yè)狀態(tài)。 SHOW SYNC JOB FROM `test`; -- 停止名稱為 `job_name` 的數(shù)據(jù)同步作業(yè) STOP SYNC JOB [db.]job_name -- 暫停名稱為 `job_name` 的數(shù)據(jù)同步作業(yè) PAUSE SYNC JOB [db.]job_name -- 恢復(fù)名稱為 `job_name` 的數(shù)據(jù)同步作業(yè) RESUME SYNC JOB `job_name`
數(shù)據(jù)導(dǎo)出數(shù)據(jù)導(dǎo)出(Export)是 Doris 提供的一種將數(shù)據(jù)導(dǎo)出的功能。該功能可以將用戶指定的表或分區(qū)的數(shù)據(jù),以文本的格式,通過 Broker 進(jìn)程導(dǎo)出到遠(yuǎn)端存儲(chǔ)上,如 HDFS / 對(duì)象存儲(chǔ)(支持S3協(xié)議) 等。原理
用戶提交一個(gè) Export 作業(yè)到 FE。FE 的 Export 調(diào)度器會(huì)通過兩階段來執(zhí)行一個(gè) Export 作業(yè):PENDING:FE 生成 ExportPendingTask,向 BE 發(fā)送 snapshot 命令,對(duì)所有涉及到的 Tablet 做一個(gè)快照。并生成多個(gè)查詢計(jì)劃。EXPORTING:FE 生成 ExportExportingTask,開始執(zhí)行查詢計(jì)劃。查詢計(jì)劃拆分Export 作業(yè)會(huì)生成多個(gè)查詢計(jì)劃,每個(gè)查詢計(jì)劃負(fù)責(zé)掃描一部分 Tablet。每個(gè)查詢計(jì)劃掃描的 Tablet 個(gè)數(shù)由 FE 配置參數(shù) export_tablet_num_per_task 指定,默認(rèn)為 5。即假設(shè)一共 100 個(gè) Tablet,則會(huì)生成 20 個(gè)查詢計(jì)劃。用戶也可以在提交作業(yè)時(shí),通過作業(yè)屬性 tablet_num_per_task 指定這個(gè)數(shù)值。一個(gè)作業(yè)的多個(gè)查詢計(jì)劃順序執(zhí)行
查詢計(jì)劃執(zhí)行一個(gè)查詢計(jì)劃掃描多個(gè)分片,將讀取的數(shù)據(jù)以行的形式組織,每 1024 行為一個(gè) batch,調(diào)用 Broker 寫入到遠(yuǎn)端存儲(chǔ)上。查詢計(jì)劃遇到錯(cuò)誤會(huì)整體自動(dòng)重試 3 次。如果一個(gè)查詢計(jì)劃重試 3 次依然失敗,則整個(gè)作業(yè)失敗。Doris 會(huì)首先在指定的遠(yuǎn)端存儲(chǔ)的路徑中,建立一個(gè)名為 __doris_export_tmp_12345 的臨時(shí)目錄(其中 12345 為作業(yè) id)。導(dǎo)出的數(shù)據(jù)首先會(huì)寫入這個(gè)臨時(shí)目錄。每個(gè)查詢計(jì)劃會(huì)生成一個(gè)文件,文件名示例:
export-data-c69fcf2b6db5420f-a96b94c1ff8bccef-1561453713822
其中 c69fcf2b6db5420f-a96b94c1ff8bccef 為查詢計(jì)劃的 query id。1561453713822 為文件生成的時(shí)間戳。當(dāng)所有數(shù)據(jù)都導(dǎo)出后,Doris 會(huì)將這些文件 rename 到用戶指定的路徑中
示例:導(dǎo)出到hdfs
EXPORT TABLE test.event_info_log1 -- 庫名.表名to "hdfs://linux01:8020/event_info_log1" -- 導(dǎo)出到那里去PROPERTIES( "label" = "event_info_log1", "column_separator"=",", "exec_mem_limit"="2147483648", "timeout" = "3600")WITH BROKER "broker_name"( "username" = "root", "password" = "");-- 1.label:本次導(dǎo)出作業(yè)的標(biāo)識(shí)。后續(xù)可以使用這個(gè)標(biāo)識(shí)查看作業(yè)狀態(tài)。-- 2.column_separator:列分隔符。默認(rèn)為 \t。支持不可見字符,比如 "\x07"。-- 3.columns:要導(dǎo)出的列,使用英文狀態(tài)逗號(hào)隔開,如果不填這個(gè)參數(shù)默認(rèn)是導(dǎo)出表的所有列。-- 4.line_delimiter:行分隔符。默認(rèn)為 \n。支持不可見字符,比如 "\x07"。-- 5.exec_mem_limit: 表示 Export 作業(yè)中,一個(gè)查詢計(jì)劃在單個(gè) BE 上的內(nèi)存使用限制。默認(rèn) 2GB。單位字節(jié)。-- 6.timeout:作業(yè)超時(shí)時(shí)間。默認(rèn) 2小時(shí)。單位秒。-- 7.tablet_num_per_task:每個(gè)查詢計(jì)劃分配的最大分片數(shù)。默認(rèn)為 5。-- 查看導(dǎo)出狀態(tài)show EXPORT \G;
注意事項(xiàng)
不建議一次性導(dǎo)出大量數(shù)據(jù)。一個(gè) Export 作業(yè)建議的導(dǎo)出數(shù)據(jù)量最大在幾十 GB。過大的導(dǎo)出會(huì)導(dǎo)致更多的垃圾文件和更高的重試成本。如果表數(shù)據(jù)量過大,建議按照分區(qū)導(dǎo)出。在 Export 作業(yè)運(yùn)行過程中,如果 FE 發(fā)生重啟或切主,則 Export 作業(yè)會(huì)失敗,需要用戶重新提交。如果 Export 作業(yè)運(yùn)行失敗,在遠(yuǎn)端存儲(chǔ)中產(chǎn)生的 __doris_export_tmp_xxx 臨時(shí)目錄,以及已經(jīng)生成的文件不會(huì)被刪除,需要用戶手動(dòng)刪除。如果 Export 作業(yè)運(yùn)行成功,在遠(yuǎn)端存儲(chǔ)中產(chǎn)生的 __doris_export_tmp_xxx 目錄,根據(jù)遠(yuǎn)端存儲(chǔ)的文件系統(tǒng)語義,可能會(huì)保留,也可能會(huì)被清除。比如在百度對(duì)象存儲(chǔ)(BOS)中,通過 rename 操作將一個(gè)目錄中的最后一個(gè)文件移走后,該目錄也會(huì)被刪除。如果該目錄沒有被清除,用戶可以手動(dòng)清除當(dāng) Export 運(yùn)行完成后(成功或失?。現(xiàn)E 發(fā)生重啟或切主,則 SHOW EXPORT展示的作業(yè)的部分信息會(huì)丟失,無法查看。Export 作業(yè)只會(huì)導(dǎo)出 Base 表的數(shù)據(jù),不會(huì)導(dǎo)出 Rollup Index 的數(shù)據(jù)。Export 作業(yè)會(huì)掃描數(shù)據(jù),占用 IO 資源,可能會(huì)影響系統(tǒng)的查詢延遲查詢結(jié)果導(dǎo)出SELECT INTO OUTFILE 語句可以將查詢結(jié)果導(dǎo)出到文件中。目前支持通過 Broker進(jìn)程, 通過 S3 協(xié)議, 或直接通過 HDFS 協(xié)議,導(dǎo)出到遠(yuǎn)端存儲(chǔ),如 HDFS,S3,BOS,COS (騰訊云)上。
-- 語法query_stmt -- 查詢語句INTO OUTFILE "file_path" --導(dǎo)出文件的路勁[format_as] -- 指定文件存儲(chǔ)的格式[properties] -- 一些配置文件
file_path:指向文件存儲(chǔ)的路徑以及文件前綴。如 hdfs://path/to/my_file_.最終的文件名將由 my_file_,文件序號(hào)以及文件格式后綴組成。其中文件序號(hào)由 0 開始,數(shù)量為文件被分割的數(shù)量
-- 如my_file_abcdefg_0.csv my_file_abcdefg_1.csv my_file_abcdegf_2.csv -- [format_as]:指定導(dǎo)出格式。默認(rèn)為 CSV-- [properties]:指定相關(guān)屬性。目前支持通過 Broker 進(jìn)程,hdfs協(xié)議等-- Broker 相關(guān)屬性需加前綴 broker.-- HDFS 相關(guān)屬性需加前綴 hdfs. 其中hdfs.fs.defaultFS 用于填寫 namenode地址和端口,屬于必填項(xiàng)。-- 如:("broker.prop_key" = "broker.prop_val", ...)("hdfs.fs.defaultFS" = "xxx", "hdfs.hdfs_user" = "xxx")-- 其他屬性:-- column_separator:列分隔符,僅對(duì) CSV 格式適用。默認(rèn)為 \t。 -- line_delimiter:行分隔符,僅對(duì) CSV 格式適用。默認(rèn)為 \n。 -- max_file_size:單個(gè)文件的最大大小。默認(rèn)為 1GB。取值范圍在 5MB 到 2GB 之間。超過這個(gè)大小的文件將會(huì)被切分。-- schema:PARQUET 文件 schema 信息。僅對(duì) PARQUET 格式適用。導(dǎo)出文件格式為 PARQUET 時(shí),必須指定 schema。
使用 broker 方式,將簡單查詢結(jié)果導(dǎo)出
select * from log_detail where id >2INTO OUTFILE "hdfs://doitedu01:8020/doris-out/broker_a_" FORMAT AS CSV PROPERTIES ( "broker.name" = "broker_name", "column_separator" = ",", "line_delimiter" = "\n", "max_file_size" = "100MB" );
使用 HDFS 方式導(dǎo)出
EXPLAIN SELECT * FROM log_detailINTO OUTFILE "hdfs://doris-out/hdfs_" FORMAT AS CSV PROPERTIES ( "fs.defaultFS" = "hdfs://doitedu01:8020", "hadoop.username" = "root","column_separator" = ",");
標(biāo)簽: