Hi,
I would like to write something that does something like a word count, and then emits only the 10 highest counts for that window. Logically, I would want to do something like: stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) However, the window context is lost after I do the sum aggregation. Is there a straightforward way to express this logic in Flink 1.0? One way I can think of is to have a complex function in apply() that has state, but I would like to know if there is something a little cleaner than that. Thanks, Kanak |
I had a similar use case and ended writing the aggregation logic in the apply function, could not find any better solution. On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala <[hidden email]> wrote: Hi, |
Hi, if you are using ingestion-time (or event-time) as your stream time characteristic, i.e.: env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or TimeCharacteristic.EventTime you can apply several window transforms after another and they will apply the same "time window" because they work on the element timestamps. What you can then do is have a window that does the aggregation and then another one (that has to be global) to select the top elements: result = input .keyBy(<some key>) .timeWindow(Time.minutes(1), Time.seconds(5)) .sum(2) .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding window here, because the above will output a new window every 5 seconds .apply(<my custom window function>) I hope this helps. Cheers, Aljoscha On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan <[hidden email]> wrote:
|
This worked when I ran my test code locally, but I'm seeing nothing reach my sink when I try to run this in YARN (previously, when I just echo'ed all sums to my sink, it would work).
Here's what my code looks like: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer09<MirrorMessageRequest> consumer = new FlinkKafkaConsumer09<>( INPUT_TOPIC, new KafkaMessageDeserializer(), properties); env.enableCheckpointing(5000); // this (or event time) is required in order to do the double-windowing below env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream<String> stream = env .addSource(consumer) .flatMap(new CountRequests()) .keyBy(0, 1) .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS)) .sum(2) .timeWindowAll(Time.of(5, TimeUnit.SECONDS)) .apply(new TopK(20)) .map(new ToString<List<Tuple3<String, String, Integer>>>()); stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new SimpleStringSchema(), properties)); env.execute(TASK_NAME); Note that CountRequests produces Tuple3<String, String, Integer>, TopK is an AllWindowFunction that produces List<Tuple3<String, String, Integer>>, and ToString is a MapFunction that is just a wrapper on Object#toString(). Anything obvious that I'm doing wrong? ________________________________ > From: [hidden email] > Date: Fri, 1 Apr 2016 09:41:12 +0000 > Subject: Re: Multiple operations on a WindowedStream > To: [hidden email] > > Hi, > if you are using ingestion-time (or event-time) as your stream time > characteristic, i.e.: > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or > TimeCharacteristic.EventTime > > you can apply several window transforms after another and they will > apply the same "time window" because they work on the element > timestamps. What you can then do is have a window that does the > aggregation and then another one (that has to be global) to select the > top elements: > > result = input > .keyBy(<some key>) > .timeWindow(Time.minutes(1), Time.seconds(5)) > .sum(2) > .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding > window here, because the above will output a new window every 5 seconds > .apply(<my custom window function>) > > I hope this helps. > > Cheers, > Aljoscha > > On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan > <[hidden email]<mailto:[hidden email]>> > wrote: > I had a similar use case and ended writing the aggregation logic in the > apply function, could not find any better solution. > > On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala > <[hidden email]<mailto:[hidden email]>> wrote: > Hi, > > I would like to write something that does something like a word count, > and then emits only the 10 highest counts for that window. Logically, I > would want to do something like: > > stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, > TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) > > However, the window context is lost after I do the sum aggregation. Is > there a straightforward way to express this logic in Flink 1.0? One way > I can think of is to have a complex function in apply() that has state, > but I would like to know if there is something a little cleaner than > that. > > Thanks, > Kanak > |
Hi, the code seems alright? Did you try looking at the Flink Dashboard to check out whether any of the operations are sending elements? Cheers, Aljoscha On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala <[hidden email]> wrote: This worked when I ran my test code locally, but I'm seeing nothing reach my sink when I try to run this in YARN (previously, when I just echo'ed all sums to my sink, it would work). |
It turns out that the problem is deeper than I originally thought. The flink dashboard reports that 0 records are being consumed, which is quite odd. Is there some issue with the 0.9 consumer on YARN?
From: [hidden email]
Date: Thu, 7 Apr 2016 09:56:42 +0000
Subject: Re: Multiple operations on a WindowedStream
To: [hidden email]
Hi,
the code seems alright? Did you try looking at the Flink Dashboard to check out whether any of the operations are sending elements?
Cheers,
Aljoscha
On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala wrote:
This worked when I ran my test code locally, but I'm seeing nothing reach my sink when I try to run this in YARN (previously, when I just echo'ed all sums to my sink, it would work).
Here's what my code looks like:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer09
|
Hi, the sources don't report records consumed. This is a bit confusing but the records sent/records consumed statistics only talk about Flink-internal sending of records, so a Kafka source would only show sent records. To really see each operator in isolation you should disable chaining for these tests: env.disableOperatorChaining() Cheers, Aljoscha On Sat, 9 Apr 2016 at 05:12 Kanak Biscuitwala <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |