簡介
在實(shí)時(shí)數(shù)據(jù)開發(fā)過程中,大家經(jīng)常會用 Flink SQL 或者 Flink DataStream API 來做數(shù)據(jù)加工。通常情況下選用2者都能加工出想要的數(shù)據(jù),但是總會有 Flink SQL 覆蓋不了的問題,但 SQL 的易用性又難以讓人釋懷。所以有些場景在使用 FLink SQL 開始就與需要額外注意,下面就介紹一種多表關(guān)聯(lián)時(shí)存在部分列更新(partial Update)場景,在 DataStream API 和 Flink SQL 開發(fā)時(shí)都容易忽視的情況而導(dǎo)致的問題。為了簡化問題描述,采用了Flink SQL 來闡述此類問題。
場景介紹
多表關(guān)聯(lián)時(shí)表 A 關(guān)聯(lián)表 B, 表 A 具有pk1, field1, field2, field3字段,表 B 具有 pk2, field4, field5, field6 字段,表 A 通過 pk1 關(guān)聯(lián)表B pk2。使用 Flink SQL 會如下實(shí)現(xiàn):
CREATE TABLE jdq_source( pk1 INT, field1 STIRNG, field2 STIRNG, field3 STIRNG, PRIMARY KEY(pk1) NOT ENFORCED ) WITH(...); CREATE TABLE sr_sink( pk1 INT, field1 STRING, field2 STRING, field3 STRING, field4 STRING, field5 STRING, field6 STRING, PRIMARY KEY(pk2) NOT ENFORCED ) WITH (...); INSERT INTO C SELECT A.pk1,A.field1,A.field2,A.field3,B.pk2,B.field4,B.field5,B.field6 FROM jdq_source A INNER JOIN sr_sink B ON A.pk1 = B.pk2;
上述實(shí)例中有明顯特征:使用了Join 關(guān)聯(lián), 且需要注意的是寫入的數(shù)據(jù)庫 sink 是 StarRocks。StarRocks 存在如下特性:當(dāng)表是主鍵表時(shí)是不支持部分列更新( Partial Update)的,實(shí)際上大部分時(shí)候大家都用的是主鍵表。
然后在一個(gè)SQL查詢數(shù)據(jù)的接口就遇到了如下問題:每次從接口查詢返回的結(jié)果都不穩(wěn)定,同樣的查詢條件不同時(shí)機(jī)返回的結(jié)果不一樣。SQL查詢語句如下:
select C.field1,C.field2,C.field3 FROM C group by field1,field2,field3;
為什么SQL查詢的結(jié)果會不一致呢?起初排查原因發(fā)現(xiàn) group by 返回結(jié)果有多條,而在SQL 中也沒有使用 order by 對數(shù)據(jù)進(jìn)行排序,所以導(dǎo)致了結(jié)果不穩(wěn)定。后又排查為什么會出現(xiàn)多條結(jié)果呢?于是懷疑 field1, field2, field3 有不符合預(yù)期的數(shù)據(jù)。如:
20240530, 2, 3
20240530, 2, null
20240531, 2, 4
其中第2條是多余的,不應(yīng)該出現(xiàn)。結(jié)果發(fā)現(xiàn)可能是如下原因?qū)е碌模哼@3個(gè)字段 filed1, field2, filed3 在StarRocks數(shù)據(jù)庫中會一直在變化,不停的寫入新值。導(dǎo)致 SQL 查詢時(shí)可以查到 field3 為 null 的數(shù)據(jù)。
為什么field3為不斷變化呢?究其原因是:StarRocks 主鍵表不支持部分列更新(Partial Update)。當(dāng)field3 為null時(shí),同樣會被寫入 StarRocks。我們在通過JDQ讀取表A field1, field2, field3 數(shù)據(jù)給表C寫入數(shù)據(jù)時(shí),當(dāng)JDQ 消息隊(duì)列中表A的記錄存在亂序場景且field3 字段可能為null時(shí),最終寫入StarRocks的field3 字段會出現(xiàn)時(shí)而為null,時(shí)而不為null。 所以SQL查詢接口中 group by的結(jié)果會出現(xiàn)不穩(wěn)定。
總結(jié)
為什么在開發(fā)的時(shí)候當(dāng)時(shí)沒有發(fā)現(xiàn) StarRocks 主鍵表這個(gè)問題呢?原因:1. 大家所關(guān)注的部分列更新,多數(shù)是關(guān)注insert into table_C(field1, field2, field3) 中不包含的字段field4,field5...等被更新為null,而當(dāng)前場景是會把 field3 為null的值也寫入SR數(shù)據(jù)庫中,這不是我們期望的結(jié)果。2.表A作為主表,通常不會出現(xiàn)開始field3有值后來又沒有值(null)的場景。出現(xiàn)這個(gè)現(xiàn)象大概率是因?yàn)樯嫌蜫DQ消息隊(duì)列中的數(shù)據(jù)亂序了,導(dǎo)致field3 為null的后出現(xiàn)了。而這種問題又比較難發(fā)現(xiàn)。
什么情況下會出現(xiàn)此類問題呢?寫入的數(shù)據(jù)庫不支持部分列更新場景時(shí)會出現(xiàn)。如StarRocks, Doris。因?yàn)镸ySQL, ES,ClickHouse的部分表引擎支持部分列更新,所以在MySQL, ES,ClickHouse中不會出現(xiàn)。
同理在 DataStream API 中如果表 A,表 B 關(guān)聯(lián)后的數(shù)據(jù)直接寫入StarRocks 的話,也會出現(xiàn)此類問題。
以上這個(gè)問題在 Flink SQL 中無法解決,在 Flink DataStream API 中可以模擬部分列更新來避免此類問題。具體方法:在DatStream 任務(wù)中增加一個(gè)MapState, 用來在新數(shù)據(jù)到來時(shí)從MapState拿出緩存的數(shù)據(jù),并和新到來的數(shù)據(jù)進(jìn)行合并,來實(shí)現(xiàn)部分列更新功能,最后再寫入 StarRocks。
雖然問題不是Flink SQL導(dǎo)致的,但是上面的問題可以通過Flink DataStream API來規(guī)避。
審核編輯 黃宇
-
SQL
+關(guān)注
關(guān)注
1文章
780瀏覽量
44801
發(fā)布評論請先 登錄
禾賽激光雷達(dá)成為理想汽車的安全新標(biāo)配
如何一眼定位SQL的代碼來源:一款SQL染色標(biāo)記的簡易MyBatis插件

DLPNIRNANOEVM固件如何編譯?
中軟國際談DeepSeek大模型帶來的影響
DAC7678怎樣才能完全輸出0電平?
Devart: dbForge Compare Bundle for SQL Server—比較SQL數(shù)據(jù)庫最簡單、最準(zhǔn)確的方法
dbForge Studio For SQL Server:用于有效開發(fā)的最佳SQL Server集成開發(fā)環(huán)境
ISO3082和MAX3485E通訊,轉(zhuǎn)出來的串口數(shù)據(jù)總出問題,為什么?怎么解決?
AFE4490電路板讀寫寄存器的值一直是0x00,為什么?怎么解決?
SQL與NoSQL的區(qū)別
基于圖遍歷的Flink任務(wù)畫布模式下零代碼開發(fā)實(shí)現(xiàn)方案

評論