Multiple operations on a WindowedStream

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple operations on a WindowedStream

Kanak Biscuitwala
Hi,

I would like to write something that does something like a word count, and then emits only the 10 highest counts for that window. Logically, I would want to do something like:

stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS)).sum(2).apply(getTopK(10))

However, the window context is lost after I do the sum aggregation. Is there a straightforward way to express this logic in Flink 1.0? One way I can think of is to have a complex function in apply() that has state, but I would like to know if there is something a little cleaner than that.

Thanks,
Kanak    
Reply | Threaded
Open this post in threaded view
|

Re: Multiple operations on a WindowedStream

Balaji Rajagopalan
I had a similar use case and ended writing the aggregation logic in the apply function, could not find any better solution. 

On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala <[hidden email]> wrote:
Hi,

I would like to write something that does something like a word count, and then emits only the 10 highest counts for that window. Logically, I would want to do something like:

stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS)).sum(2).apply(getTopK(10))

However, the window context is lost after I do the sum aggregation. Is there a straightforward way to express this logic in Flink 1.0? One way I can think of is to have a complex function in apply() that has state, but I would like to know if there is something a little cleaner than that.

Thanks,
Kanak                                     

Reply | Threaded
Open this post in threaded view
|

Re: Multiple operations on a WindowedStream

Aljoscha Krettek
Hi,
if you are using ingestion-time (or event-time) as your stream time characteristic, i.e.:

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or TimeCharacteristic.EventTime

you can apply several window transforms after another and they will apply the same "time window" because they work on the element timestamps. What you can then do is have a window that does the aggregation and then another one (that has to be global) to select the top elements:

result = input
  .keyBy(<some key>)
  .timeWindow(Time.minutes(1), Time.seconds(5))
  .sum(2)
  .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding window here, because the above will output a new window every 5 seconds
  .apply(<my custom window function>)

I hope this helps.

Cheers,
Aljoscha

On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan <[hidden email]> wrote:
I had a similar use case and ended writing the aggregation logic in the apply function, could not find any better solution. 

On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala <[hidden email]> wrote:
Hi,

I would like to write something that does something like a word count, and then emits only the 10 highest counts for that window. Logically, I would want to do something like:

stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS)).sum(2).apply(getTopK(10))

However, the window context is lost after I do the sum aggregation. Is there a straightforward way to express this logic in Flink 1.0? One way I can think of is to have a complex function in apply() that has state, but I would like to know if there is something a little cleaner than that.

Thanks,
Kanak                                     

Reply | Threaded
Open this post in threaded view
|

RE: Multiple operations on a WindowedStream

Kanak Biscuitwala
This worked when I ran my test code locally, but I'm seeing nothing reach my sink when I try to run this in YARN (previously, when I just echo'ed all sums to my sink, it would work).

Here's what my code looks like:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer09<MirrorMessageRequest> consumer = new FlinkKafkaConsumer09<>(
                INPUT_TOPIC, new KafkaMessageDeserializer(), properties);
        env.enableCheckpointing(5000);

        // this (or event time) is required in order to do the double-windowing below
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<String> stream = env
                .addSource(consumer)
                .flatMap(new CountRequests())
                .keyBy(0, 1)
                .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS))
                .sum(2)
                .timeWindowAll(Time.of(5, TimeUnit.SECONDS))
                .apply(new TopK(20))
                .map(new ToString<List<Tuple3<String, String, Integer>>>());
        stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new SimpleStringSchema(),
                properties));
        env.execute(TASK_NAME);

Note that CountRequests produces Tuple3<String, String, Integer>, TopK is an AllWindowFunction that produces List<Tuple3<String, String, Integer>>, and ToString is a MapFunction that is just a wrapper on Object#toString().

Anything obvious that I'm doing wrong?
________________________________

> From: [hidden email]
> Date: Fri, 1 Apr 2016 09:41:12 +0000
> Subject: Re: Multiple operations on a WindowedStream
> To: [hidden email]
>
> Hi,
> if you are using ingestion-time (or event-time) as your stream time
> characteristic, i.e.:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or
> TimeCharacteristic.EventTime
>
> you can apply several window transforms after another and they will
> apply the same "time window" because they work on the element
> timestamps. What you can then do is have a window that does the
> aggregation and then another one (that has to be global) to select the
> top elements:
>
> result = input
> .keyBy(<some key>)
> .timeWindow(Time.minutes(1), Time.seconds(5))
> .sum(2)
> .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding
> window here, because the above will output a new window every 5 seconds
> .apply(<my custom window function>)
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan
> <[hidden email]<mailto:[hidden email]>>
> wrote:
> I had a similar use case and ended writing the aggregation logic in the
> apply function, could not find any better solution.
>
> On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala
> <[hidden email]<mailto:[hidden email]>> wrote:
> Hi,
>
> I would like to write something that does something like a word count,
> and then emits only the 10 highest counts for that window. Logically, I
> would want to do something like:
>
> stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5,
> TimeUnit.SECONDS)).sum(2).apply(getTopK(10))
>
> However, the window context is lost after I do the sum aggregation. Is
> there a straightforward way to express this logic in Flink 1.0? One way
> I can think of is to have a complex function in apply() that has state,
> but I would like to know if there is something a little cleaner than
> that.
>
> Thanks,
> Kanak
>
     
Reply | Threaded
Open this post in threaded view
|

Re: Multiple operations on a WindowedStream

Aljoscha Krettek
Hi,
the code seems alright? Did you try looking at the Flink Dashboard to check out whether any of the operations are sending elements?

Cheers,
Aljoscha

On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala <[hidden email]> wrote:
This worked when I ran my test code locally, but I'm seeing nothing reach my sink when I try to run this in YARN (previously, when I just echo'ed all sums to my sink, it would work).

Here's what my code looks like:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer09<MirrorMessageRequest> consumer = new FlinkKafkaConsumer09<>(
                INPUT_TOPIC, new KafkaMessageDeserializer(), properties);
        env.enableCheckpointing(5000);

        // this (or event time) is required in order to do the double-windowing below
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<String> stream = env
                .addSource(consumer)
                .flatMap(new CountRequests())
                .keyBy(0, 1)
                .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS))
                .sum(2)
                .timeWindowAll(Time.of(5, TimeUnit.SECONDS))
                .apply(new TopK(20))
                .map(new ToString<List<Tuple3<String, String, Integer>>>());
        stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new SimpleStringSchema(),
                properties));
        env.execute(TASK_NAME);

Note that CountRequests produces Tuple3<String, String, Integer>, TopK is an AllWindowFunction that produces List<Tuple3<String, String, Integer>>, and ToString is a MapFunction that is just a wrapper on Object#toString().

Anything obvious that I'm doing wrong?
________________________________
> From: [hidden email]
> Date: Fri, 1 Apr 2016 09:41:12 +0000
> Subject: Re: Multiple operations on a WindowedStream
> To: [hidden email]
>
> Hi,
> if you are using ingestion-time (or event-time) as your stream time
> characteristic, i.e.:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or
> TimeCharacteristic.EventTime
>
> you can apply several window transforms after another and they will
> apply the same "time window" because they work on the element
> timestamps. What you can then do is have a window that does the
> aggregation and then another one (that has to be global) to select the
> top elements:
>
> result = input
> .keyBy(<some key>)
> .timeWindow(Time.minutes(1), Time.seconds(5))
> .sum(2)
> .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding
> window here, because the above will output a new window every 5 seconds
> .apply(<my custom window function>)
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan
> <[hidden email]<mailto:[hidden email]>>
> wrote:
> I had a similar use case and ended writing the aggregation logic in the
> apply function, could not find any better solution.
>
> On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala
> <[hidden email]<mailto:[hidden email]>> wrote:
> Hi,
>
> I would like to write something that does something like a word count,
> and then emits only the 10 highest counts for that window. Logically, I
> would want to do something like:
>
> stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5,
> TimeUnit.SECONDS)).sum(2).apply(getTopK(10))
>
> However, the window context is lost after I do the sum aggregation. Is
> there a straightforward way to express this logic in Flink 1.0? One way
> I can think of is to have a complex function in apply() that has state,
> but I would like to know if there is something a little cleaner than
> that.
>
> Thanks,
> Kanak
>
                                         
Reply | Threaded
Open this post in threaded view
|

RE: Multiple operations on a WindowedStream

Kanak Biscuitwala
It turns out that the problem is deeper than I originally thought. The flink dashboard reports that 0 records are being consumed, which is quite odd. Is there some issue with the 0.9 consumer on YARN? From: [hidden email] Date: Thu, 7 Apr 2016 09:56:42 +0000 Subject: Re: Multiple operations on a WindowedStream To: [hidden email] Hi, the code seems alright? Did you try looking at the Flink Dashboard to check out whether any of the operations are sending elements? Cheers, Aljoscha On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala wrote: This worked when I ran my test code locally, but I'm seeing nothing reach my sink when I try to run this in YARN (previously, when I just echo'ed all sums to my sink, it would work). Here's what my code looks like:         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>(                 INPUT_TOPIC, new KafkaMessageDeserializer(), properties);         env.enableCheckpointing(5000);         // this (or event time) is required in order to do the double-windowing below         env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);         DataStream stream = env                 .addSource(consumer)                 .flatMap(new CountRequests())                 .keyBy(0, 1)                 .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS))                 .sum(2)                 .timeWindowAll(Time.of(5, TimeUnit.SECONDS))                 .apply(new TopK(20))                 .map(new ToString>>());         stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new SimpleStringSchema(),                 properties));         env.execute(TASK_NAME); Note that CountRequests produces Tuple3, TopK is an AllWindowFunction that produces List>, and ToString is a MapFunction that is just a wrapper on Object#toString(). Anything obvious that I'm doing wrong? ________________________________ > From: [hidden email] > Date: Fri, 1 Apr 2016 09:41:12 +0000 > Subject: Re: Multiple operations on a WindowedStream > To: [hidden email] > > Hi, > if you are using ingestion-time (or event-time) as your stream time > characteristic, i.e.: > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or > TimeCharacteristic.EventTime > > you can apply several window transforms after another and they will > apply the same "time window" because they work on the element > timestamps. What you can then do is have a window that does the > aggregation and then another one (that has to be global) to select the > top elements: > > result = input > .keyBy() > .timeWindow(Time.minutes(1), Time.seconds(5)) > .sum(2) > .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding > window here, because the above will output a new window every 5 seconds > .apply() > > I hope this helps. > > Cheers, > Aljoscha > > On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan > > > wrote: > I had a similar use case and ended writing the aggregation logic in the > apply function, could not find any better solution. > > On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala > > wrote: > Hi, > > I would like to write something that does something like a word count, > and then emits only the 10 highest counts for that window. Logically, I > would want to do something like: > > stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, > TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) > > However, the window context is lost after I do the sum aggregation. Is > there a straightforward way to express this logic in Flink 1.0? One way > I can think of is to have a complex function in apply() that has state, > but I would like to know if there is something a little cleaner than > that. > > Thanks, > Kanak >                                          
Reply | Threaded
Open this post in threaded view
|

Re: Multiple operations on a WindowedStream

Aljoscha Krettek
Hi,
the sources don't report records consumed. This is a bit confusing but the records sent/records consumed statistics only talk about Flink-internal sending of records, so a Kafka source would only show sent records.

To really see each operator in isolation you should disable chaining for these tests:

env.disableOperatorChaining()

Cheers,
Aljoscha

On Sat, 9 Apr 2016 at 05:12 Kanak Biscuitwala <[hidden email]> wrote:
It turns out that the problem is deeper than I originally thought. The flink dashboard reports that 0 records are being consumed, which is quite odd. Is there some issue with the 0.9 consumer on YARN? From: [hidden email] Date: Thu, 7 Apr 2016 09:56:42 +0000 Subject: Re: Multiple operations on a WindowedStream To: [hidden email] Hi, the code seems alright? Did you try looking at the Flink Dashboard to check out whether any of the operations are sending elements? Cheers, Aljoscha On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala wrote: This worked when I ran my test code locally, but I'm seeing nothing reach my sink when I try to run this in YARN (previously, when I just echo'ed all sums to my sink, it would work). Here's what my code looks like:         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>(                 INPUT_TOPIC, new KafkaMessageDeserializer(), properties);         env.enableCheckpointing(5000);         // this (or event time) is required in order to do the double-windowing below         env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);         DataStream stream = env                 .addSource(consumer)                 .flatMap(new CountRequests())                 .keyBy(0, 1)                 .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS))                 .sum(2)                 .timeWindowAll(Time.of(5, TimeUnit.SECONDS))                 .apply(new TopK(20))                 .map(new ToString>>());         stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new SimpleStringSchema(),                 properties));         env.execute(TASK_NAME); Note that CountRequests produces Tuple3, TopK is an AllWindowFunction that produces List>, and ToString is a MapFunction that is just a wrapper on Object#toString(). Anything obvious that I'm doing wrong? ________________________________ > From: [hidden email] > Date: Fri, 1 Apr 2016 09:41:12 +0000 > Subject: Re: Multiple operations on a WindowedStream > To: [hidden email] > > Hi, > if you are using ingestion-time (or event-time) as your stream time > characteristic, i.e.: > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or > TimeCharacteristic.EventTime > > you can apply several window transforms after another and they will > apply the same "time window" because they work on the element > timestamps. What you can then do is have a window that does the > aggregation and then another one (that has to be global) to select the > top elements: > > result = input > .keyBy() > .timeWindow(Time.minutes(1), Time.seconds(5)) > .sum(2) > .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding > window here, because the above will output a new window every 5 seconds > .apply() > > I hope this helps. > > Cheers, > Aljoscha > > On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan > > > wrote: > I had a similar use case and ended writing the aggregation logic in the > apply function, could not find any better solution. > > On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala > > wrote: > Hi, > > I would like to write something that does something like a word count, > and then emits only the 10 highest counts for that window. Logically, I > would want to do something like: > > stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, > TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) > > However, the window context is lost after I do the sum aggregation. Is > there a straightforward way to express this logic in Flink 1.0? One way > I can think of is to have a complex function in apply() that has state, > but I would like to know if there is something a little cleaner than > that. > > Thanks, > Kanak >