您現在的位置是:首頁 > 垂釣

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

  • 由 大資料羊說 發表于 垂釣
  • 2022-04-24
簡介max’ = ‘11’)資料匯表:CREATE TABLE sink_table (order_number BIGINT,priceDECIMAL(32,2)) WITH (‘connector’ =

distinct語句轉換成什麼

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

感謝您的小愛心

(關注 + 點贊 + 再看)

,對博主的肯定,會督促博主持續的輸出更多的優質實戰內容!!!

1。序篇-本文結構

前面的章節鋪墊了那麼多,終於在本節走入一條 query 了。

針對 datastream api 大家都比較熟悉了,還是那句話,在 datastream 中,你寫的程式碼邏輯是什麼樣的,它最終的執行方式就是什麼樣的。

但是對於 flink sql 的執行過程,大家還是不熟悉的。

因此本文透過以下章節使用 ETL,group agg(sum,count等)簡單聚合類 query 帶大家走進一條 flink sql query 邏輯的世界。幫大家至少能夠熟悉在 flink sql 程式執行時知道 flink 程式在幹什麼。

背景篇-大家不瞭解 flink sql 什麼?

目標篇-本文能幫助大家瞭解 flink sql 什麼?

實戰篇-簡單的 query 案例和執行原理

總結與展望篇

先說說結論:

場景問題

:flink sql 很適合簡單 ETL,以及基本全部場景下的聚合類指標。

語法問題

:flink sql 語法其實是和其他 sql 語法基本一致的。基本不會產生語法問題阻礙使用 flink sql。

執行問題

:檢視 flink sql 任務時的一些技巧:

去 flink webui 看看這個任務目前在做什麼。包括運算元名稱都會給直接展示給我們目前哪個運算元在幹啥事情,在處理啥邏輯。

如果你想知道你的 flink 任務執行了什麼程式碼,就去看看 sql 最後轉換成的 transformation 裡面具體要執行哪些操作。flink sql 生成的程式碼也在裡面。

如果你不確定線上任務執行原理,可以直接在本地嘗試執行。

2。背景篇-大家不瞭解 flink sql 什麼?

首先從大家用 flink sql 的一個初衷和狀態出發,想一下大家在開始上手 flink sql 時,是什麼樣的一個想法?

博主大概整理了下,在初步上手 flink sql,一般從入手到踩坑整個過程中,一般都會有以下幾種問題或者想法:

場景問題

:首先 flink sql 是用來提效的,那相比 datastream,哪些場景很適合 flink sql 去做?

語法問題

:我寫 sql 時 flink sql 語法會不會和其他 sql 語法有不同?

執行問題

:我寫了一條 sql,執行起來了,但是對我來說是黑盒的,我怎麼知道這個任務正在執行什麼操作?有沒有什麼好辦法幫我去理解 flink sql 的執行機制?

理解誤區

:在理解 flink sql 的運算機制上有哪些誤區?

:flink sql 一般都有啥坑?提前瞭解幫我們避免踩坑。

就是上面這些想法,會讓很多想在公司內部引入 flink sql 的同學望而卻步。

3。目標篇-本文能幫助大家瞭解 flink sql 什麼?

來看看本文的目標:

場景問題:幫大家理解哪些場景是非常適合 flink sql 的

語法問題:幫大家簡單熟悉 flink sql 的語法

執行問題:使用一條簡單的 query sql 看看其執行起來的過程,其執行的機制

理解誤區:運算機制上的常見誤區

坑:看看 sql 一般會有啥坑

由於一篇文章不能覆蓋所有概念,本文主要介紹一些最簡單的 ETL,聚合場景,主要集中於前三點。

後兩點在後續系列文章中會按照場景詳細展開。

4。實戰篇-簡單的 query 案例和執行原理

4。1。場景問題:有哪些場景適合 flink sql?

不裝了,我坦白了,flink sql 其實很適合乾的活就是 dwd 清洗,dws 聚合。

此處主要針對實時數倉的場景來說。flink sql 能幹 dwd 清洗,dws 聚合,基本上實時數倉的大多數場景都能給覆蓋了。

flink sql 牛逼!!!

但是!!!

經過博主使用 flink sql 經驗來看,並不是所有的 dwd,dws 聚合場景都適合 flink sql(截止發文階段來說)!!!

其實這些目前不適合 flink sql 的場景總結下來就是在處理上比 datastream 還是會有一定的損失。

先總結下使用場景:

1. dwd

:簡單的清洗、複雜的清洗、維度的擴充、各種 udf 的使用

2. dws

:各類聚合

然後分適合的場景和不適合的場景來說,因為只這一篇不能覆蓋所有的內容,所以本文此處先大致給個結論,之後會結合具體的場景詳細描述。

適合的場景:

簡單的 dwd 清洗場景

全場景的 dws 聚合場景

目前不太適合的場景:

複雜的 dwd 清洗場景:舉例比如使用了很多 udf 清洗,尤其是使用很多的 json 類解析清洗

關聯維度場景:舉例比如 datastream 中經常會有攢一批資料批次訪問外部介面的場景,flink sql 目前對於這種場景雖然有 localcache、非同步訪問能力,但是依然還是一條一條訪問外部快取,這樣相比批次訪問還是會有效能差距。

4。2。語法\執行問題

其實總結來說,對於接觸過 sql 的同學來說,除了 flink sql 中視窗聚合類的寫法來說,其他的 sql 語法都是相同的,很容易理解。

本節會針對具體的案例進行詳細介紹。

4。2。1。ETL

最簡單的 ETL 型別任務。

SELECT select_list FROM table_expression [ WHERE boolean_expression ]

1.場景:簡單的 dwd 清洗過濾場景

原始碼公眾號後臺回覆

不會連最適合 flink sql 的 ETL 和 group agg 場景都沒見過吧

獲取。

資料來源表:

CREATE TABLE source_table ( order_number BIGINT, price DECIMAL(32,2)) WITH ( ‘connector’ = ‘datagen’, ‘rows-per-second’ = ‘10’, ‘fields。order_number。min’ = ‘10’, ‘fields。order_number。max’ = ‘11’)

資料匯表:

CREATE TABLE sink_table ( order_number BIGINT, price DECIMAL(32,2)) WITH ( ‘connector’ = ‘print’)

ETL 邏輯:

insert into sink_tableselect * from source_tablewhere order_number = 10

2.執行:可以看到,其實在 flink sql 任務中,其會把對應的處理邏輯給寫到運算元名稱上面。

Notes - 觀察 flink sql 技巧 1:

這個其實就是我們觀察 flink sql 任務的第一個技巧。如果你想知道你的 flink 任務在幹啥,第一反應是去 flink webui 看看這個任務目前在做什麼。包括運算元名稱都會給直接展示給我們目前哪個運算元在幹啥事情,在處理啥邏輯

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

3.結果

+I[10, 337546916355686018150362513408。00]+I[10, 734895198061906189720381030400。00]+I[10, 496632591763800912960818249728。00]+I[10, 495090465926828588045441171456。00]+I[10, 167305033642317182838130081792。00]+I[10, 409466913112794578407573684224。00]+I[10, 894352160414515330502514180096。00]+I[10, 680063350384451712068576346112。00]+I[10, 50807402446574997641386524672。00]+I[10, 646597093362022945955245981696。00]+I[10, 233317961584082024331537809408。00]。。。

4.原理:

先看一下一個 flink sql 任務的入口執行邏輯。

首先看看建表語句的執行和 query 語句執行的邏輯有什麼不同。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

可以發現執行到

executeInternal

時會針對具體的

operation

來執行不同的操作。

執行建表操作就是具體的

CreateTableOperation

時,會將表的資訊儲存到

catalogManager

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

執行 query 操作就是具體的

ModifyOperation

時,會將對應的邏輯轉換成對應的

Transformation

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

Transformation

中就包含了執行的整體邏輯以及對應要執行的 sql 程式碼內容。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

接下來我們詳細看下對應的 transform 中包含了什麼內容。

首先是最外層

LegacySinkTransformation

,即 sink 運算元,如圖就是 print sink function。比較好理解。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

然後是中間層

OneInputTransformation

,即 sql 中過濾和轉換操作(

select * from source_table where order_number = 10

),如圖是程式碼生成的具體過濾和轉換邏輯。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

生成的程式碼就在

GeneratedOperator

中的

code

欄位。我們將對應的

code

複製到一個新的資料夾中。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

這個運算元是直接繼承了

OneInputStreamOperator

進行直接執行邏輯,跳過了 datastream 那一層。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

我們來看看最重要的

processElement

邏輯,具體欄位解釋和執行邏輯如圖所示。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

Notes - 觀察 flink sql 技巧 2:

這個其實就是我們觀察 flink sql 任務的第二個技巧。如果你想知道你的 flink 任務執行了什麼程式碼,就去看看 sql 最後轉換成的 transformation 裡面具體要執行哪些操作。

4。2。2。去重場景

1.場景:最簡單的去重場景

原始碼公眾號後臺回覆

不會連最適合 flink sql 的 ETL 和 group agg 場景都沒見過吧

獲取。

資料來源:

CREATE TABLE source_table ( string_field STRING) WITH ( ‘connector’ = ‘datagen’, ‘rows-per-second’ = ‘10’, ‘fields。string_field。length’ = ‘3’)

資料匯:

CREATE TABLE sink_table ( string_field STRING) WITH ( ‘connector’ = ‘print’)

資料處理:

insert into sink_tableselect distinct string_fieldfrom source_table

2.執行:可以看到,其實在 flink sql 任務中,其會把對應的處理邏輯給寫到運算元名稱上面。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

3.上面這個案例的結果:

+I[cd3]+I[8fc]+I[b0c]+I[1d8]+I[e28]+I[c5f]+I[e7d]+I[dfa]+I[1fe]。。。

4.原理:

此處我們只關注和上面不同的邏輯。

第一個就是

PartitionTransform

中的

KeyGroupStreamPartitioner

,就是對應的分割槽邏輯。來看看生成程式碼的邏輯。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

其中做 shuffle 邏輯時,是按照

string_field

作為 key 進行 shuffle。

第二個就是

OneInputTransformation

中的

KeyedProcessOperator

,就是對應的去重邏輯。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

可以看到生成的 function 中只有這三段程式碼是業務邏輯程式碼,但是其中的 RowData 初始化大小都是 0。那麼到底是哪裡做的去重邏輯呢?

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

我們跟一下處理邏輯會發現。去重邏輯主要集中在

GroupAggFunction#processElement

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

4。2。3。group 聚合場景

4。2。3。1。簡單聚合場景

1.場景:最簡單的聚合場景

原始碼公眾號後臺回覆

不會連最適合 flink sql 的 ETL 和 group agg 場景都沒見過吧

獲取。

count,sum,avg,max,min 等:

資料來源:

CREATE TABLE source_table ( order_id STRING, price BIGINT) WITH ( ‘connector’ = ‘datagen’, ‘rows-per-second’ = ‘10’, ‘fields。order_id。length’ = ‘1’, ‘fields。price。min’ = ‘1’, ‘fields。price。max’ = ‘1000000’)

資料匯:

CREATE TABLE sink_table ( order_id STRING, count_result BIGINT, sum_result BIGINT, avg_result DOUBLE, min_result BIGINT, max_result BIGINT) WITH ( ‘connector’ = ‘print’)

資料處理邏輯:

insert into sink_tableselect order_id, count(*) as count_result, sum(price) as sum_result, avg(price) as avg_result, min(price) as min_result, max(price) as max_resultfrom source_tablegroup by order_id

2.執行:

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

3.上面這個案例的結果:

+I[1, 1, 415300, 415300。0, 415300, 415300]+I[d, 1, 416878, 416878。0, 416878, 416878]+I[0, 1, 120837, 120837。0, 120837, 120837]+I[c, 1, 337749, 337749。0, 337749, 337749]+I[7, 1, 387053, 387053。0, 387053, 387053]+I[8, 1, 387042, 387042。0, 387042, 387042]+I[2, 1, 546317, 546317。0, 546317, 546317]+I[e, 1, 22131, 22131。0, 22131, 22131]+I[9, 1, 651731, 651731。0, 651731, 651731]-U[0, 1, 120837, 120837。0, 120837, 120837]+U[0, 2, 566664, 283332。0, 120837, 445827]+I[b, 1, 748659, 748659。0, 748659, 748659]-U[7, 1, 387053, 387053。0, 387053, 387053]+U[7, 2, 1058056, 529028。0, 387053, 671003]

4.原理:

來瞅一眼 transformation。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

還是和之前的邏輯一樣,跟一下 GroupAggFunction 的邏輯。如下圖,有五個執行步驟執行計算。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

再看最終生成的 function 程式碼邏輯。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

首先看看 count 怎麼算的。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

sum 怎麼算的。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

4。2。3。2。去重聚合場景

1.場景:去重聚合場景

資料來源:

CREATE TABLE source_table ( dim STRING, user_id BIGINT) WITH ( ‘connector’ = ‘datagen’, ‘rows-per-second’ = ‘10’, ‘fields。dim。length’ = ‘1’, ‘fields。user_id。min’ = ‘1’, ‘fields。user_id。max’ = ‘1000000’)

資料匯:

CREATE TABLE sink_table ( dim STRING, uv BIGINT) WITH ( ‘connector’ = ‘print’)

資料處理:

insert into sink_tableselect dim, count(distinct user_id) as uvfrom source_tablegroup by dim

2.執行:

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

3.上面這個案例的結果:

+U[9, 3097]-U[a, 3054]+U[a, 3055]-U[8, 3030]+U[8, 3031]-U[4, 3137]+U[4, 3138]-U[6, 3139]+U[6, 3140]-U[0, 3082]+U[0, 3083]

4.原理:

此處只看和之前的案例不一樣的地方。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

4。2。3。3。語法糖

1.grouping sets

多維計算。相當於語法糖,使用者可以根據自己的場景去指定自己想要的維度組合。

資料匯:

CREATE TABLE sink_table ( supplier_id STRING, product_id STRING, total BIGINT) WITH ( ‘connector’ = ‘print’)

資料處理邏輯:

insert into sink_tableSELECT supplier_id, product_id, COUNT(*) AS totalFROM (VALUES (‘supplier1’, ‘product1’, 4), (‘supplier1’, ‘product2’, 3), (‘supplier2’, ‘product3’, 3), (‘supplier2’, ‘product4’, 4))AS Products(supplier_id, product_id, rating)GROUP BY GROUPING SETS ((supplier_id, product_id), (supplier_id), ())

其結果等同於:

insert into sink_tableSELECT supplier_id, product_id, COUNT(*) AS totalFROM (VALUES (‘supplier1’, ‘product1’, 4), (‘supplier1’, ‘product2’, 3), (‘supplier2’, ‘product3’, 3), (‘supplier2’, ‘product4’, 4))AS Products(supplier_id, product_id, rating)GROUP BY supplier_id, product_idUNION ALLSELECT supplier_id, cast(null as string) as product_id, COUNT(*) AS totalFROM (VALUES (‘supplier1’, ‘product1’, 4), (‘supplier1’, ‘product2’, 3), (‘supplier2’, ‘product3’, 3), (‘supplier2’, ‘product4’, 4))AS Products(supplier_id, product_id, rating)GROUP BY supplier_idUNION ALLSELECT cast(null as string) AS supplier_id, cast(null as string) AS product_id, COUNT(*) AS totalFROM (VALUES (‘supplier1’, ‘product1’, 4), (‘supplier1’, ‘product2’, 3), (‘supplier2’, ‘product3’, 3), (‘supplier2’, ‘product4’, 4))AS Products(supplier_id, product_id, rating)

結果如下:

+I[supplier1, product1, 1]+I[supplier1, null, 1]+I[null, null, 1]+I[supplier1, product2, 1]-U[supplier1, null, 1]+U[supplier1, null, 2]-U[null, null, 1]+U[null, null, 2]+I[supplier2, product3, 1]+I[supplier2, null, 1]-U[null, null, 2]+U[null, null, 3]+I[supplier2, product4, 1]-U[supplier2, null, 1]+U[supplier2, null, 2]-U[null, null, 3]+U[null, null, 4]

grouping sets 能幫助我們在多維場景下,減少很多冗餘程式碼。關於 grouping sets 原理後面的系列文章會介紹。

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

2.rollup

rollup 是上卷計算的一種簡化寫法。比如可以把

GROUPING SETS ((supplier_id, product_id), (supplier_id), ())

簡化為

ROLLUP (supplier_id, product_id)

資料匯:

CREATE TABLE sink_table ( supplier_id STRING, product_id STRING, total BIGINT) WITH ( ‘connector’ = ‘print’)

資料處理邏輯:

SELECT supplier_id, rating, COUNT(*)FROM (VALUES (‘supplier1’, ‘product1’, 4), (‘supplier1’, ‘product2’, 3), (‘supplier2’, ‘product3’, 3), (‘supplier2’, ‘product4’, 4))AS Products(supplier_id, product_id, rating)GROUP BY ROLLUP (supplier_id, product_id)

其結果等同於:

SELECT supplier_id, rating, product_id, COUNT(*)FROM (VALUES (‘supplier1’, ‘product1’, 4), (‘supplier1’, ‘product2’, 3), (‘supplier2’, ‘product3’, 3), (‘supplier2’, ‘product4’, 4))AS Products(supplier_id, product_id, rating)GROUP BY GROUPING SET ( ( supplier_id, product_id ), ( supplier_id ), ( ))

結果如下:

+I[supplier1, product1, 1]+I[supplier1, null, 1]+I[null, null, 1]+I[supplier1, product2, 1]-U[supplier1, null, 1]+U[supplier1, null, 2]-U[null, null, 1]+U[null, null, 2]+I[supplier2, product3, 1]+I[supplier2, null, 1]-U[null, null, 2]+U[null, null, 3]+I[supplier2, product4, 1]-U[supplier2, null, 1]+U[supplier2, null, 2]-U[null, null, 3]+U[null, null, 4]

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

5.CUBE 計算

原始碼公眾號後臺回覆

不會連最適合 flink sql 的 ETL 和 group agg 場景都沒見過吧

獲取。

cube 相當於是一種覆蓋了所有維度組合聚合計算。比如 group by a, b, c。其會將 a, b, c 三個維度的所有維度組合進行 group by。

資料匯:

CREATE TABLE sink_table ( supplier_id STRING, product_id STRING, total BIGINT) WITH ( ‘connector’ = ‘print’)

資料處理邏輯:

SELECT supplier_id, rating, product_id, COUNT(*)FROM (VALUES (‘supplier1’, ‘product1’, 4), (‘supplier1’, ‘product2’, 3), (‘supplier2’, ‘product3’, 3), (‘supplier2’, ‘product4’, 4))AS Products(supplier_id, product_id, rating)GROUP BY CUBE (supplier_id, product_id)

它等同於

SELECT supplier_id, rating, product_id, COUNT(*)FROM (VALUES (‘supplier1’, ‘product1’, 4), (‘supplier1’, ‘product2’, 3), (‘supplier2’, ‘product3’, 3), (‘supplier2’, ‘product4’, 4))AS Products(supplier_id, product_id, rating)GROUP BY GROUPING SET ( ( supplier_id, product_id ), ( supplier_id ), ( product_id ), ( ))

結果如下:

+I[supplier1, product1, 1]+I[supplier1, null, 1]+I[null, product1, 1]+I[null, null, 1]+I[supplier1, product2, 1]-U[supplier1, null, 1]+U[supplier1, null, 2]+I[null, product2, 1]-U[null, null, 1]+U[null, null, 2]+I[supplier2, product3, 1]+I[supplier2, null, 1]+I[null, product3, 1]-U[null, null, 2]+U[null, null, 3]+I[supplier2, product4, 1]-U[supplier2, null, 1]+U[supplier2, null, 2]+I[null, product4, 1]-U[null, null, 3]+U[null, null, 4]

flink sql 知其所以然(七):ETL 和 group agg 場景都沒見過吧

5。總結與展望篇

本文主要介紹了 ETL,group agg 聚合類指標的一些常見場景案例以及其底層執行原理。我們可以發現 flink sql 的語法其實和 hive sql,mysql 啥的語法都是基本一致的。所以上手 flink sql 時,語法基本不會成為我們的障礙。

而且也介紹了在檢視

flink sql 任務時的一些技巧

去 flink webui 看看這個任務目前在做什麼。包括運算元名稱都會給直接展示給我們目前哪個運算元在幹啥事情,在處理啥邏輯。

如果你想知道你的 flink 任務執行了什麼程式碼,就去看看 sql 最後轉換成的 transformation 裡面具體要執行哪些操作。

後續文章會繼續介紹 flink sql 視窗聚合,一些理解誤區,和坑之類的案例。

希望大家能持續關注。支援博主。喜歡的請

關注 + 點贊 + 再看

往期推薦

flink sql 知其所以然(六)| flink sql 約會 calcite(看這篇就夠了)

flink sql 知其所以然(五)| 自定義 protobuf format

flink sql 知其所以然(四)| sql api 型別系統

flink sql 知其所以然(三)| 自定義 redis 資料匯表(附原始碼)

flink sql 知其所以然(二)| 自定義 redis 資料維表(附原始碼)

flink sql 知其所以然(一)| source\sink 原理

更多 Flink 實時大資料分析相關技術博文,影片。後臺回覆 “flink” 或 “flink sql” 獲取。

點個贊+在看,感謝您的肯定

Top