http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-SQL-Count-Distinct-performance-optimization-tp32003p32035.html
hi,
Thanks for the reply. I am using default FsStateBackend rather than rocksdb with checkpoint off. So I really cannot see any state info from the dashboard. I will research more details and see if any alternative can be optimized.
At 2020-01-08 19:07:08, "Benchao Li" <[hidden email]> wrote:
>hi sunfulin,
>
>As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
>bound your job.
>You can check WindowOperator's latency metric to see how long it tasks to
>process an element.
>Hope this helps.
>
>sunfulin <[hidden email]> 于2020年1月8日周三 下午4:04写道:
>
>> Ah, I had checked resource usage and GC from flink dashboard. Seem that
>> the reason is not cpu or memory issue. Task heap memory usage is less then
>> 30%. Could you kindly tell that how I can see more metrics to help target
>> the bottleneck?
>> Really appreciated that.
>>
>>
>>
>>
>>
>> At 2020-01-08 15:59:17, "Kurt Young" <[hidden email]> wrote:
>>
>> Hi,
>>
>> Could you try to find out what's the bottleneck of your current job? This
>> would leads to
>> different optimizations. Such as whether it's CPU bounded, or you have too
>> big local
>> state thus stuck by too many slow IOs.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Jan 8, 2020 at 3:53 PM 贺小令 <[hidden email]> wrote:
>>
>>> hi sunfulin,
>>> you can try with blink planner (since 1.9 +), which optimizes distinct
>>> aggregation. you can also try to enable
>>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>>
>>> best,
>>> godfreyhe
>>>
>>> sunfulin <[hidden email]> 于2020年1月8日周三 下午3:39写道:
>>>
>>>> Hi, community,
>>>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>>>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>>>> data set in realtime, and aggregate results with sink to ElasticSearch
>>>> index. I met a severe performance issue when running my flink job. Wanner
>>>> get some help from community.
>>>>
>>>>
>>>> Flink version : 1.8.2
>>>> Running on yarn with 4 yarn slots per task manager. My flink task
>>>> parallelism is set to be 10, which is equal to my kafka source partitions.
>>>> After running the job, I can observe high backpressure from the flink
>>>> dashboard. Any suggestions and kind of help is highly appreciated.
>>>>
>>>>
>>>> running sql is like the following:
>>>>
>>>>
>>>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>>>
>>>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>>>> clkCnt from
>>>>
>>>> (
>>>>
>>>> SELECT
>>>>
>>>> aggId,
>>>>
>>>> pageId,
>>>>
>>>> statkey,
>>>>
>>>> COUNT(DISTINCT deviceId) as cnt
>>>>
>>>> FROM
>>>>
>>>> (
>>>>
>>>> SELECT
>>>>
>>>> 'ZL_005' as aggId,
>>>>
>>>> 'ZL_UV_PER_MINUTE' as pageId,
>>>>
>>>> deviceId,
>>>>
>>>> ts2Date(recvTime) as statkey
>>>>
>>>> from
>>>>
>>>> kafka_zl_etrack_event_stream
>>>>
>>>> )
>>>>
>>>> GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>>>
>>>> ) as t1
>>>>
>>>> group by aggId, pageId, statkey
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Best
>>>
>>>
>>
>>
>>
>
>
>--
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: [hidden email]; [hidden email]