Question about runtime filter

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about runtime filter

faaron zheng
Hi, everyone    

These days, I am trying to implement runtime filter in flink1.10 with flink-sql-benchmark  according to blink. I mainly change three part of flink code: add runtime filter rule; modify the code gen and bloomfilter; add some aggregatedaccumulator  methods according to accumulator. Now, It seems runtime filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date, i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id, i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id, i_manufact, i_size, i_formulation, i_color, i_units, i_container, i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12] -> Calc(select=[i_item_sk], where=[((i_category = _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])      

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq, d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year, d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday, d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly, d_same_day_lq, d_current_day, d_current_week, d_current_month, d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] -> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])  


However,the number of records sent is the same as normal.  Anyone who can give me some advices?



Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Question about runtime filter

JingsongLee
Hi,

Does runtime filter probe side wait for building runtime filter?
Can you check the start time of build side and probe side? 

Best,
Jingsong Lee

------------------------------------------------------------------
From:faaron zheng <[hidden email]>
Send Time:2020年3月2日(星期一) 14:55
To:user <[hidden email]>
Subject:Question about runtime filter

Hi, everyone    

These days, I am trying to implement runtime filter in flink1.10 with flink-sql-benchmark  according to blink. I mainly change three part of flink code: add runtime filter rule; modify the code gen and bloomfilter; add some aggregatedaccumulator  methods according to accumulator. Now, It seems runtime filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date, i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id, i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id, i_manufact, i_size, i_formulation, i_color, i_units, i_container, i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12] -> Calc(select=[i_item_sk], where=[((i_category = _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])      

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq, d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year, d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday, d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly, d_same_day_lq, d_current_day, d_current_week, d_current_month, d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] -> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])  


However,the number of records sent is the same as normal.  Anyone who can give me some advices?



Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Question about runtime filter

faaron zheng
Thanks for replying Lee,  I follow your method to debug the code and I find the build side only call addPreAggregatedAccumulator but not call commit method. Furthermore, I add a breakpoint at future.handleAsync in asyncGetBroadcastBloomFilter method. But when program stop at if(e==null && accumulator != null), it finish with result immediately. Any suggestion?

JingsongLee <[hidden email]> 于 2020年3月2日周一 下午3:22写道:
Hi,

Does runtime filter probe side wait for building runtime filter?
Can you check the start time of build side and probe side? 

Best,
Jingsong Lee

------------------------------------------------------------------
From:faaron zheng <[hidden email]>
Send Time:2020年3月2日(星期一) 14:55
To:user <[hidden email]>
Subject:Question about runtime filter

Hi, everyone    

These days, I am trying to implement runtime filter in flink1.10 with flink-sql-benchmark  according to blink. I mainly change three part of flink code: add runtime filter rule; modify the code gen and bloomfilter; add some aggregatedaccumulator  methods according to accumulator. Now, It seems runtime filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date, i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id, i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id, i_manufact, i_size, i_formulation, i_color, i_units, i_container, i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12] -> Calc(select=[i_item_sk], where=[((i_category = _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])      

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq, d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year, d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday, d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly, d_same_day_lq, d_current_day, d_current_week, d_current_month, d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] -> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])  


However,the number of records sent is the same as normal.  Anyone who can give me some advices?



Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Question about runtime filter

faaron zheng
In reply to this post by JingsongLee
I set sql.exec.runtime-filter.wait to true. HiveTableSource take much longer time but get same result. I think the reason is not commit preAggregateAccumulator. But I dont know why it happens? 

JingsongLee <[hidden email]> 于 2020年3月2日周一 下午3:22写道:
Hi,

Does runtime filter probe side wait for building runtime filter?
Can you check the start time of build side and probe side? 

Best,
Jingsong Lee

------------------------------------------------------------------
From:faaron zheng <[hidden email]>
Send Time:2020年3月2日(星期一) 14:55
To:user <[hidden email]>
Subject:Question about runtime filter

Hi, everyone    

These days, I am trying to implement runtime filter in flink1.10 with flink-sql-benchmark  according to blink. I mainly change three part of flink code: add runtime filter rule; modify the code gen and bloomfilter; add some aggregatedaccumulator  methods according to accumulator. Now, It seems runtime filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date, i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id, i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id, i_manufact, i_size, i_formulation, i_color, i_units, i_container, i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12] -> Calc(select=[i_item_sk], where=[((i_category = _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])      

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq, d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year, d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday, d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly, d_same_day_lq, d_current_day, d_current_week, d_current_month, d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] -> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])  


However,the number of records sent is the same as normal.  Anyone who can give me some advices?



Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Question about runtime filter

faaron zheng
I finally got through the runtimefilter in 1.10, the reason why it didn't call commit method is in OperatorCodeGenerator. It should call endInput() method correctly in generateOneInputStreamOperator. A complete process of runtimefilter is: 1.Add insert and remove rule in batch rules. 2.Build side constructs bloomfilter and commit. 3. Jobmanager merge bloomfilter as a global one. 4. Probe side get global bloomfilter and filter data. Although runtimefilter is already achieved in blink, it doesn't have a independent commit. So it's a little hard to merge whole code once. I hope it helps if anyone try to do same thing.

faaron zheng <[hidden email]> 于 2020年3月2日周一 下午7:52写道:
I set sql.exec.runtime-filter.wait to true. HiveTableSource take much longer time but get same result. I think the reason is not commit preAggregateAccumulator. But I dont know why it happens? 

JingsongLee <[hidden email]> 于 2020年3月2日周一 下午3:22写道:
Hi,

Does runtime filter probe side wait for building runtime filter?
Can you check the start time of build side and probe side? 

Best,
Jingsong Lee

------------------------------------------------------------------
From:faaron zheng <[hidden email]>
Send Time:2020年3月2日(星期一) 14:55
To:user <[hidden email]>
Subject:Question about runtime filter

Hi, everyone    

These days, I am trying to implement runtime filter in flink1.10 with flink-sql-benchmark  according to blink. I mainly change three part of flink code: add runtime filter rule; modify the code gen and bloomfilter; add some aggregatedaccumulator  methods according to accumulator. Now, It seems runtime filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date, i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id, i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id, i_manufact, i_size, i_formulation, i_color, i_units, i_container, i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12] -> Calc(select=[i_item_sk], where=[((i_category = _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])      

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq, d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year, d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday, d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly, d_same_day_lq, d_current_day, d_current_week, d_current_month, d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] -> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])  


However,the number of records sent is the same as normal.  Anyone who can give me some advices?



Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Question about runtime filter

Jingsong Li
Great exploration. And thanks for your information.
I believe you have a deep understanding of Flink's internal mechanism.

Best,
Jingsong Lee

On Thu, Mar 5, 2020 at 12:09 PM faaron zheng <[hidden email]> wrote:
I finally got through the runtimefilter in 1.10, the reason why it didn't call commit method is in OperatorCodeGenerator. It should call endInput() method correctly in generateOneInputStreamOperator. A complete process of runtimefilter is: 1.Add insert and remove rule in batch rules. 2.Build side constructs bloomfilter and commit. 3. Jobmanager merge bloomfilter as a global one. 4. Probe side get global bloomfilter and filter data. Although runtimefilter is already achieved in blink, it doesn't have a independent commit. So it's a little hard to merge whole code once. I hope it helps if anyone try to do same thing.

faaron zheng <[hidden email]> 于 2020年3月2日周一 下午7:52写道:
I set sql.exec.runtime-filter.wait to true. HiveTableSource take much longer time but get same result. I think the reason is not commit preAggregateAccumulator. But I dont know why it happens? 

JingsongLee <[hidden email]> 于 2020年3月2日周一 下午3:22写道:
Hi,

Does runtime filter probe side wait for building runtime filter?
Can you check the start time of build side and probe side? 

Best,
Jingsong Lee

------------------------------------------------------------------
From:faaron zheng <[hidden email]>
Send Time:2020年3月2日(星期一) 14:55
To:user <[hidden email]>
Subject:Question about runtime filter

Hi, everyone    

These days, I am trying to implement runtime filter in flink1.10 with flink-sql-benchmark  according to blink. I mainly change three part of flink code: add runtime filter rule; modify the code gen and bloomfilter; add some aggregatedaccumulator  methods according to accumulator. Now, It seems runtime filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date, i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id, i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id, i_manufact, i_size, i_formulation, i_color, i_units, i_container, i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12] -> Calc(select=[i_item_sk], where=[((i_category = _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])      

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq, d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year, d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday, d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly, d_same_day_lq, d_current_day, d_current_week, d_current_month, d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] -> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])  


However,the number of records sent is the same as normal.  Anyone who can give me some advices?



Thanks


--
Best, Jingsong Lee