Streaming

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Streaming

aitozi
Hi, community

I am using flink to deal with some situation.

1. "distinct count" to calculate the uv/pv.
2.  calculate the topN of the past 1 hour or 1 day time.

Are these all realized by window? Or is there a best practice on doing this?

3. And when deal with the distinct, if there is no need to do the keyBy
previous, how does the window deal with this.

Thanks
Aitozi.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming

zhangminglei
Aitozi

From my side, I do not think distinct is very easy to deal with. Even though together work with kafka support exactly-once.

For uv, we can use a bloomfilter to filter pv for geting uv in the end.

Window is usually used in an aggregate operation, so I think all should be realized by windows.

I am not familiar with this fields, so I still want to know what others response this question.

Cheers
Minglei



> 在 2018年6月27日,下午5:12,aitozi <[hidden email]> 写道:
>
> Hi, community
>
> I am using flink to deal with some situation.
>
> 1. "distinct count" to calculate the uv/pv.
> 2.  calculate the topN of the past 1 hour or 1 day time.
>
> Are these all realized by window? Or is there a best practice on doing this?
>
> 3. And when deal with the distinct, if there is no need to do the keyBy
> previous, how does the window deal with this.
>
> Thanks
> Aitozi.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Streaming

zhangminglei
To aitozi. 

Cheers
Minglei

在 2018年6月27日,下午5:46,shimin yang <[hidden email]> 写道:

Aitozi

We are using hyperloglog to count daily uv, but it only provided an approximate value. I also tried the count distinct in flink table without window, but need to set the retention time.

However, the time resolution of this operator is 1 millisecond, so it ends up with too many timers in the java heap which might leads to OOM.

Cheers
Shimin

2018-06-27 17:34 GMT+08:00 zhangminglei <[hidden email]>:
Aitozi

From my side, I do not think distinct is very easy to deal with. Even though together work with kafka support exactly-once.

For uv, we can use a bloomfilter to filter pv for geting uv in the end.

Window is usually used in an aggregate operation, so I think all should be realized by windows.

I am not familiar with this fields, so I still want to know what others response this question.

Cheers
Minglei



> 在 2018年6月27日,下午5:12,aitozi <[hidden email]> 写道:
>
> Hi, community
>
> I am using flink to deal with some situation.
>
> 1. "distinct count" to calculate the uv/pv.
> 2.  calculate the topN of the past 1 hour or 1 day time.
>
> Are these all realized by window? Or is there a best practice on doing this?
>
> 3. And when deal with the distinct, if there is no need to do the keyBy
> previous, how does the window deal with this.
>
> Thanks
> Aitozi.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




Reply | Threaded
Open this post in threaded view
|

Re: Streaming

zhangminglei
In reply to this post by zhangminglei
Forward shiming mail to Aitozi.

Aitozi

We are using hyperloglog to count daily uv, but it only provided an approximate value. I also tried the count distinct in flink table without window, but need to set the retention time.

However, the time resolution of this operator is 1 millisecond, so it ends up with too many timers in the java heap which might leads to OOM.

Cheers
Shimin


> 在 2018年6月27日,下午5:34,zhangminglei <[hidden email]> 写道:
>
> Aitozi
>
> From my side, I do not think distinct is very easy to deal with. Even though together work with kafka support exactly-once.
>
> For uv, we can use a bloomfilter to filter pv for geting uv in the end.
>
> Window is usually used in an aggregate operation, so I think all should be realized by windows.
>
> I am not familiar with this fields, so I still want to know what others response this question.
>
> Cheers
> Minglei
>
>
>
>> 在 2018年6月27日,下午5:12,aitozi <[hidden email]> 写道:
>>
>> Hi, community
>>
>> I am using flink to deal with some situation.
>>
>> 1. "distinct count" to calculate the uv/pv.
>> 2.  calculate the topN of the past 1 hour or 1 day time.
>>
>> Are these all realized by window? Or is there a best practice on doing this?
>>
>> 3. And when deal with the distinct, if there is no need to do the keyBy
>> previous, how does the window deal with this.
>>
>> Thanks
>> Aitozi.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Reply | Threaded
Open this post in threaded view
|

Re:Streaming

gerryzhou
In reply to this post by aitozi
Hi aitozi,

I think it can be implemented by window or non-window, but it can not be implemented without keyBy(). A general approach to implement this is as follow.

{code}
process(Record records) {
    for (Record record : records) (
        if (!isFilter(record)) {
            agg(record); 
        }
    }
}
{code}

Where the isFilter() is to filter the duplicated records, and the agg() is the function to do aggregation, in your case that means the count().

In general, the isFilter() can be implemented base on the MapState<String, Integer> to store the previous records, so the isFilter() may look like.

{code}
boolean isFilter(Record record) {
    Integer oldVal = mapState.get(record);
    if (oldVal == null) {
        mapState.put(record, 1L);
        return false;
    } else {
        mapState.put(record, oldVal + 1L);
        return true;
    }
}
{code}

as you can see, we need to query the state frequently, one way with better performance is to the use BloomFilter to implement the isFilter() but with an approximate result(the accuracy is configurable), unfortunately it's not easy to use the bloom filter in flink, there are some works need to do to introduce it (https://issues.apache.org/jira/browse/FLINK-8601).

Best, Sihua
On 06/27/2018 17:12[hidden email] wrote:
Hi, community

I am using flink to deal with some situation.

1. "distinct count" to calculate the uv/pv.
2.  calculate the topN of the past 1 hour or 1 day time.

Are these all realized by window? Or is there a best practice on doing this?

3. And when deal with the distinct, if there is no need to do the keyBy
previous, how does the window deal with this.

Thanks
Aitozi.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming

Rong Rong
Hi ,

Stream distinct accumulator is actually supported in SQL API [1]. The syntax is pretty much identical to the batch case. A simple example using the tumbling window will be.
SELECT COUNT(DISTINCT col) 
FROM t 
GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE)
I haven't added the support but I think it should be easy to support this in table API as well [2]. 

For TopN I think this could be implemented as a UDF[3], there's also an example in the test utility from the Flink repo [4] that might be a good example.

In terms of the aggregation strategy, I believe "window" is not necessarily needed if your data sink can support retraction / upsert, I think @Fabian or @Timo might have more context here.

Thanks,
Rong


On Wed, Jun 27, 2018 at 7:22 AM sihua zhou <[hidden email]> wrote:
Hi aitozi,

I think it can be implemented by window or non-window, but it can not be implemented without keyBy(). A general approach to implement this is as follow.

{code}
process(Record records) {
    for (Record record : records) (
        if (!isFilter(record)) {
            agg(record); 
        }
    }
}
{code}

Where the isFilter() is to filter the duplicated records, and the agg() is the function to do aggregation, in your case that means the count().

In general, the isFilter() can be implemented base on the MapState<String, Integer> to store the previous records, so the isFilter() may look like.

{code}
boolean isFilter(Record record) {
    Integer oldVal = mapState.get(record);
    if (oldVal == null) {
        mapState.put(record, 1L);
        return false;
    } else {
        mapState.put(record, oldVal + 1L);
        return true;
    }
}
{code}

as you can see, we need to query the state frequently, one way with better performance is to the use BloomFilter to implement the isFilter() but with an approximate result(the accuracy is configurable), unfortunately it's not easy to use the bloom filter in flink, there are some works need to do to introduce it (https://issues.apache.org/jira/browse/FLINK-8601).

Best, Sihua
On 06/27/2018 17:12[hidden email] wrote:
Hi, community

I am using flink to deal with some situation.

1. "distinct count" to calculate the uv/pv.
2.  calculate the topN of the past 1 hour or 1 day time.

Are these all realized by window? Or is there a best practice on doing this?

3. And when deal with the distinct, if there is no need to do the keyBy
previous, how does the window deal with this.

Thanks
Aitozi.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming

Hequn Cheng
Hi aitozi,

1> CountDistinct
Currently (flink-1.5), CountDistinct is supported in SQL only under window as RongRong described. 
There are ways to implement non-window CountDistinct, for example: a) you can write a CountDistinct udaf using MapView or b) Use two groupBy to achieve it. The first groupBy distinct records and the second groupBy count different records. For the above two non-window approaches, the second one achieves a better performance.

As for the OOM problem, I guess you have set the minIdleStateRetentionTime and maxIdleStateRetentionTime to a same value which makes the operator registers a timer for each record. I opened a issue to track this problem[1]. It is better to set different value to these two parameters, for example set min to 0.5 day and max to 1 day.

2> TopN
Currently, TopN has not been supported in SQL/Table-api. The semantic of TopN is different from all the operators available now. For example, TopN is an update operator which outputs multi rows for each partition key. However, you can write a datastream job to implement TopN.

Thanks, Hequn


On Wed, Jun 27, 2018 at 10:55 PM, Rong Rong <[hidden email]> wrote:
Hi ,

Stream distinct accumulator is actually supported in SQL API [1]. The syntax is pretty much identical to the batch case. A simple example using the tumbling window will be.
SELECT COUNT(DISTINCT col) 
FROM t 
GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE)
I haven't added the support but I think it should be easy to support this in table API as well [2]. 

For TopN I think this could be implemented as a UDF[3], there's also an example in the test utility from the Flink repo [4] that might be a good example.

In terms of the aggregation strategy, I believe "window" is not necessarily needed if your data sink can support retraction / upsert, I think @Fabian or @Timo might have more context here.

Thanks,
Rong


On Wed, Jun 27, 2018 at 7:22 AM sihua zhou <[hidden email]> wrote:
Hi aitozi,

I think it can be implemented by window or non-window, but it can not be implemented without keyBy(). A general approach to implement this is as follow.

{code}
process(Record records) {
    for (Record record : records) (
        if (!isFilter(record)) {
            agg(record); 
        }
    }
}
{code}

Where the isFilter() is to filter the duplicated records, and the agg() is the function to do aggregation, in your case that means the count().

In general, the isFilter() can be implemented base on the MapState<String, Integer> to store the previous records, so the isFilter() may look like.

{code}
boolean isFilter(Record record) {
    Integer oldVal = mapState.get(record);
    if (oldVal == null) {
        mapState.put(record, 1L);
        return false;
    } else {
        mapState.put(record, oldVal + 1L);
        return true;
    }
}
{code}

as you can see, we need to query the state frequently, one way with better performance is to the use BloomFilter to implement the isFilter() but with an approximate result(the accuracy is configurable), unfortunately it's not easy to use the bloom filter in flink, there are some works need to do to introduce it (https://issues.apache.org/jira/browse/FLINK-8601).

Best, Sihua
On 06/27/2018 17:12[hidden email] wrote:
Hi, community

I am using flink to deal with some situation.

1. "distinct count" to calculate the uv/pv.
2.  calculate the topN of the past 1 hour or 1 day time.

Are these all realized by window? Or is there a best practice on doing this?

3. And when deal with the distinct, if there is no need to do the keyBy
previous, how does the window deal with this.

Thanks
Aitozi.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Streaming

Hequn Cheng
For the above two non-window approaches, the second one achieves a better performance. => For the above two non-window approaches, the second one achieves a better performance  in most cases especially when there are many same rows.

On Thu, Jun 28, 2018 at 12:25 AM, Hequn Cheng <[hidden email]> wrote:
Hi aitozi,

1> CountDistinct
Currently (flink-1.5), CountDistinct is supported in SQL only under window as RongRong described. 
There are ways to implement non-window CountDistinct, for example: a) you can write a CountDistinct udaf using MapView or b) Use two groupBy to achieve it. The first groupBy distinct records and the second groupBy count different records. For the above two non-window approaches, the second one achieves a better performance.

As for the OOM problem, I guess you have set the minIdleStateRetentionTime and maxIdleStateRetentionTime to a same value which makes the operator registers a timer for each record. I opened a issue to track this problem[1]. It is better to set different value to these two parameters, for example set min to 0.5 day and max to 1 day.

2> TopN
Currently, TopN has not been supported in SQL/Table-api. The semantic of TopN is different from all the operators available now. For example, TopN is an update operator which outputs multi rows for each partition key. However, you can write a datastream job to implement TopN.

Thanks, Hequn


On Wed, Jun 27, 2018 at 10:55 PM, Rong Rong <[hidden email]> wrote:
Hi ,

Stream distinct accumulator is actually supported in SQL API [1]. The syntax is pretty much identical to the batch case. A simple example using the tumbling window will be.
SELECT COUNT(DISTINCT col) 
FROM t 
GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE)
I haven't added the support but I think it should be easy to support this in table API as well [2]. 

For TopN I think this could be implemented as a UDF[3], there's also an example in the test utility from the Flink repo [4] that might be a good example.

In terms of the aggregation strategy, I believe "window" is not necessarily needed if your data sink can support retraction / upsert, I think @Fabian or @Timo might have more context here.

Thanks,
Rong


On Wed, Jun 27, 2018 at 7:22 AM sihua zhou <[hidden email]> wrote:
Hi aitozi,

I think it can be implemented by window or non-window, but it can not be implemented without keyBy(). A general approach to implement this is as follow.

{code}
process(Record records) {
    for (Record record : records) (
        if (!isFilter(record)) {
            agg(record); 
        }
    }
}
{code}

Where the isFilter() is to filter the duplicated records, and the agg() is the function to do aggregation, in your case that means the count().

In general, the isFilter() can be implemented base on the MapState<String, Integer> to store the previous records, so the isFilter() may look like.

{code}
boolean isFilter(Record record) {
    Integer oldVal = mapState.get(record);
    if (oldVal == null) {
        mapState.put(record, 1L);
        return false;
    } else {
        mapState.put(record, oldVal + 1L);
        return true;
    }
}
{code}

as you can see, we need to query the state frequently, one way with better performance is to the use BloomFilter to implement the isFilter() but with an approximate result(the accuracy is configurable), unfortunately it's not easy to use the bloom filter in flink, there are some works need to do to introduce it (https://issues.apache.org/jira/browse/FLINK-8601).

Best, Sihua
On 06/27/2018 17:12[hidden email] wrote:
Hi, community

I am using flink to deal with some situation.

1. "distinct count" to calculate the uv/pv.
2.  calculate the topN of the past 1 hour or 1 day time.

Are these all realized by window? Or is there a best practice on doing this?

3. And when deal with the distinct, if there is no need to do the keyBy
previous, how does the window deal with this.

Thanks
Aitozi.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Streaming

zhangminglei
In reply to this post by gerryzhou
Hi, Sihua & Aitozi

I would like add more here, As @Sihua said, we need to query the state frequently. Assume if you use redis to store these states, it will consume a lot of your redis resources. So, you can use a bloomfilter before access to redis. 

If a pv is told to exist by bloomfilter, then you do a query to check whether it really exist or not. Otherwise, Add 1 directly for uv. Then you can get a precise number of UV and at the same time it also reduces the pressure on redis.

Cheers
Minglei

在 2018年6月27日,下午10:21,sihua zhou <[hidden email]> 写道:

Hi aitozi,

I think it can be implemented by window or non-window, but it can not be implemented without keyBy(). A general approach to implement this is as follow.

{code}
process(Record records) {
    for (Record record : records) (
        if (!isFilter(record)) {
            agg(record); 
        }
    }
}
{code}

Where the isFilter() is to filter the duplicated records, and the agg() is the function to do aggregation, in your case that means the count().

In general, the isFilter() can be implemented base on the MapState<String, Integer> to store the previous records, so the isFilter() may look like.

{code}
boolean isFilter(Record record) {
    Integer oldVal = mapState.get(record);
    if (oldVal == null) {
        mapState.put(record, 1L);
        return false;
    } else {
        mapState.put(record, oldVal + 1L);
        return true;
    }
}
{code}

as you can see, we need to query the state frequently, one way with better performance is to the use BloomFilter to implement the isFilter() but with an approximate result(the accuracy is configurable), unfortunately it's not easy to use the bloom filter in flink, there are some works need to do to introduce it (https://issues.apache.org/jira/browse/FLINK-8601).

Best, Sihua
On 06/27/2018 17:12[hidden email] wrote: 
Hi, community

I am using flink to deal with some situation.

1. "distinct count" to calculate the uv/pv.
2.  calculate the topN of the past 1 hour or 1 day time.

Are these all realized by window? Or is there a best practice on doing this?

3. And when deal with the distinct, if there is no need to do the keyBy
previous, how does the window deal with this.

Thanks 
Aitozi.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Streaming

aitozi
In reply to this post by aitozi
Hi, all

Thanks for your reply.

1. Can i ask how does the SQL like below transform to a low-level datastream
job?


2. If i implement a distinct in datastream job, and there is no keyBy needed
advance , and we just calculate the global distinct count, Does i just can
used the AllWindowedStream or the SQL above?

Thanks,
Aitozi





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming

Hequn Cheng
Hi aitozi,

1> how will sql translated into a datastream job?
The Table API and SQL leverage Apache Calcite for parsing, validation, and query optimization. After optimization, the logical plan of the job will be translated into a datastream job. The logical plan contains many different logical operators(source, sink, group-by, join...) and different operators have different datstream implementations.

2> global count distinct
Yes, you can use the AllWindowedStream or the SQL above. I think SQL may be more convenience and worth a try. :-)

On Thu, Jun 28, 2018 at 2:47 PM, aitozi <[hidden email]> wrote:
Hi, all

Thanks for your reply.

1. Can i ask how does the SQL like below transform to a low-level datastream
job?


2. If i implement a distinct in datastream job, and there is no keyBy needed
advance , and we just calculate the global distinct count, Does i just can
used the AllWindowedStream or the SQL above?

Thanks,
Aitozi