Re:Re: Re: Flink SQL Count Distinct performance optimization

Posted by sunfulin on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-SQL-Count-Distinct-performance-optimization-tp32003p32035.html

hi,
Thanks for the reply. I am using default FsStateBackend rather than rocksdb with checkpoint off. So I really cannot see any state info from the dashboard. I will research more details and see if any alternative can be optimized. 






At 2020-01-08 19:07:08, "Benchao Li" <[hidden email]> wrote: >hi sunfulin, > >As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO >bound your job. >You can check WindowOperator's latency metric to see how long it tasks to >process an element. >Hope this helps. > >sunfulin <[hidden email]> 于2020年1月8日周三 下午4:04写道: > >> Ah, I had checked resource usage and GC from flink dashboard. Seem that >> the reason is not cpu or memory issue. Task heap memory usage is less then >> 30%. Could you kindly tell that how I can see more metrics to help target >> the bottleneck? >> Really appreciated that. >> >> >> >> >> >> At 2020-01-08 15:59:17, "Kurt Young" <[hidden email]> wrote: >> >> Hi, >> >> Could you try to find out what's the bottleneck of your current job? This >> would leads to >> different optimizations. Such as whether it's CPU bounded, or you have too >> big local >> state thus stuck by too many slow IOs. >> >> Best, >> Kurt >> >> >> On Wed, Jan 8, 2020 at 3:53 PM 贺小令 <[hidden email]> wrote: >> >>> hi sunfulin, >>> you can try with blink planner (since 1.9 +), which optimizes distinct >>> aggregation. you can also try to enable >>> *table.optimizer.distinct-agg.split.enabled* if the data is skew. >>> >>> best, >>> godfreyhe >>> >>> sunfulin <[hidden email]> 于2020年1月8日周三 下午3:39写道: >>> >>>> Hi, community, >>>> I'm using Apache Flink SQL to build some of my realtime streaming apps. >>>> With one scenario I'm trying to count(distinct deviceID) over about 100GB >>>> data set in realtime, and aggregate results with sink to ElasticSearch >>>> index. I met a severe performance issue when running my flink job. Wanner >>>> get some help from community. >>>> >>>> >>>> Flink version : 1.8.2 >>>> Running on yarn with 4 yarn slots per task manager. My flink task >>>> parallelism is set to be 10, which is equal to my kafka source partitions. >>>> After running the job, I can observe high backpressure from the flink >>>> dashboard. Any suggestions and kind of help is highly appreciated. >>>> >>>> >>>> running sql is like the following: >>>> >>>> >>>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt) >>>> >>>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as >>>> clkCnt from >>>> >>>> ( >>>> >>>> SELECT >>>> >>>> aggId, >>>> >>>> pageId, >>>> >>>> statkey, >>>> >>>> COUNT(DISTINCT deviceId) as cnt >>>> >>>> FROM >>>> >>>> ( >>>> >>>> SELECT >>>> >>>> 'ZL_005' as aggId, >>>> >>>> 'ZL_UV_PER_MINUTE' as pageId, >>>> >>>> deviceId, >>>> >>>> ts2Date(recvTime) as statkey >>>> >>>> from >>>> >>>> kafka_zl_etrack_event_stream >>>> >>>> ) >>>> >>>> GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024) >>>> >>>> ) as t1 >>>> >>>> group by aggId, pageId, statkey >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> Best >>> >>> >> >> >> > > >-- > >Benchao Li >School of Electronics Engineering and Computer Science, Peking University >Tel:+86-15650713730 >Email: [hidden email]; [hidden email]