Hi, everyone
These days, I am trying to implement runtime filter in flink1.10 with flink-sql-benchmark according to blink. I mainly change three part of flink code: add runtime filter rule; modify the code gen and bloomfilter; add some aggregatedaccumulator methods according to accumulator. Now, It seems runtime filter works in execution graph as follows: Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date, i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id, i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id, i_manufact, i_size, i_formulation, i_color, i_units, i_container, i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12] -> Calc(select=[i_item_sk], where=[((i_category = _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))]) and Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq, d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year, d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday, d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly, d_same_day_lq, d_current_day, d_current_week, d_current_month, d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] -> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)]) However,the number of records sent is the same as normal. Anyone who can give me some advices? Thanks |
Hi, Does runtime filter probe side wait for building runtime filter? Can you check the start time of build side and probe side? Best, Jingsong Lee
|
Thanks for replying Lee, I follow your method to debug the code and I find the build side only call addPreAggregatedAccumulator but not call commit method. Furthermore, I add a breakpoint at future.handleAsync in asyncGetBroadcastBloomFilter method. But when program stop at if(e==null && accumulator != null), it finish with result immediately. Any suggestion? JingsongLee <[hidden email]> 于 2020年3月2日周一 下午3:22写道:
|
In reply to this post by JingsongLee
I set sql.exec.runtime-filter.wait to true. HiveTableSource take much longer time but get same result. I think the reason is not commit preAggregateAccumulator. But I dont know why it happens? JingsongLee <[hidden email]> 于 2020年3月2日周一 下午3:22写道:
|
I finally got through the runtimefilter in 1.10, the reason why it didn't call commit method is in OperatorCodeGenerator. It should call endInput() method correctly in generateOneInputStreamOperator. A complete process of runtimefilter is: 1.Add insert and remove rule in batch rules. 2.Build side constructs bloomfilter and commit. 3. Jobmanager merge bloomfilter as a global one. 4. Probe side get global bloomfilter and filter data. Although runtimefilter is already achieved in blink, it doesn't have a independent commit. So it's a little hard to merge whole code once. I hope it helps if anyone try to do same thing. faaron zheng <[hidden email]> 于 2020年3月2日周一 下午7:52写道:
|
Great exploration. And thanks for your information. I believe you have a deep understanding of Flink's internal mechanism. Best, Jingsong Lee On Thu, Mar 5, 2020 at 12:09 PM faaron zheng <[hidden email]> wrote:
Best, Jingsong Lee |
Free forum by Nabble | Edit this page |