Hi everyone,
I am using a simple window computation on a stream with event time. The code looks like this: streamData.readTextFile(...) .map(...) .assignAscendingTimestamps(_.timestamp) .keyBy(_.id) .timeWindow(Time.seconds(3600),Time.seconds(3600)) .apply(new MyWindowFunction) .map(...) By monitoring the memory usage and the flink web dashboard, I noticed that flink applies the window function until the entire stream finishes (thus storing all aggregations in memory) and then continues to the map transformation. What I would expect is emission of window results to the map transformation as soon as results of the window are ready. Can anyone explain this behavior? |
Hi Sonex, When using readTextFile(...) with event time, only one watermark with the value Long.MAX_VALUE is sent at the end of the stream, which explais why the windows are stored until the whole file is processed. In order to have periodic watermarks, you need to process the file continuousely as folows: TextInputFormat inputFormat = new TextInputFormat(new Path("file/to/read.txt")); Hope this helps. Best, Yassine 2017-03-23 9:47 GMT+01:00 Sonex <[hidden email]>: Hi everyone, |
Thank you for your response Yassine,
I forgot to mention that I use the Scala API. In Scala the equivalent code is: val inputFormat = new TextInputFormat(new Path("file/to/read.txt")) env.readFile(inputFormat,"file/to/read.txt", FileProcessingMode.PROCESS_CONTINUOUSLY,10000L) Am I correct? But I noticed a weird behavior now. Sometimes, it never starts to process the elements of the file and sometimes it stops at the middle of the file without processing the rest of it. Why does that happen? |
Hi Sonex, The behaviour you described seems wierd to me and should not happen. I'm unfortunately unable to identify an apparent cause, maybe someone in the mailing list can shed a light on that. Best, Yassine 2017-03-23 13:16 GMT+01:00 Sonex <[hidden email]>: Thank you for your response Yassine, |
Hi Sonex, I assume the elements in your file have a timestamp associated which is parsed in the first map function, right? Now my question would be: What is the range of this timestamp value? In your program you've defined a time window of 1 hour. If the timestamps lie all in a window of 1 hour, then you won't see the triggering of the windows before the complete file has been read. Cheers, Till On Fri, Mar 24, 2017 at 11:23 AM, Yassine MARZOUGUI <[hidden email]> wrote:
|
Hello Till,
Yes elements do have a timestamp associated which is parsed in the first map function. Yes, indeed if all timestamps lie within 1 hour the triggering will happen after the complete file has been read. I had wrong window size and sliding step for a dataset I was testing (I tested it in different datasets). Now, since I solved the non-triggering from the beginning, one problem remains in all my tests. Assume that I have a dataset of 50 hours of events/elements. The triggering happens as expected but now when it reaches the 6th hour it stops and do not continue (all parallel operators remain idle). Thanx, Sonex |
In reply to this post by Till Rohrmann
The degree of parallelism in the experiments I mentioned is 8. If I decrease the parallelism it emits more results. If I set the parallelism to 1 then it emits results from the entire dataset (i.e., it behaves as expected). What could be the reason of this?
|
Do you mean that window results with timestamps larger than the 6th hour won't be output? Can it be the case that some of the keys simply don't have elements arriving after the 6th hour? It's really hard to tell given no knowledge about your input data set. Maybe you can compile a small example code and data set to reproduce your problem. This would help me to debug the issue. Cheers, Till On Mon, Mar 27, 2017 at 11:51 AM, Sonex <[hidden email]> wrote: The degree of parallelism in the experiments I mentioned is 8. If I decrease |
I have prepared a small dummy dataset (data.txt) as follows:
Hello|5 Hi|15 WordsWithoutMeaning|25 AnotherWord|34 HelloWorld|46 HelloPlanet|67 HelloFlinkUsers|89 HelloProgrammers|98 DummyPhrase|105 AnotherDummy|123 And below is the code: import org.apache.flink.api.java.io.TextInputFormat import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.FileProcessingMode import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector val parallelism = 8 // sliding step (ms) val slidingStep = 30 // window size (ms) val windowSize = 30 // start the streaming environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.disableSysoutLogging // set the degree of parallelism env.setParallelism(parallelism) // set the time characteristic env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val inputFormat = new TextInputFormat(new Path("data.txt")) env.readFile(inputFormat,"data.txt", FileProcessingMode.PROCESS_CONTINUOUSLY,10000L) .map{ element => val partsOfElement = element.split("[|]") (partsOfElement.head,partsOfElement.last.toLong) }.assignAscendingTimestamps(_._2) .keyBy(_._1) .timeWindow(Time.milliseconds(windowSize),Time.milliseconds(slidingStep)) .apply(new Test) env.execute And the test class is the following: class Test extends WindowFunction[(String,Long),String,String,TimeWindow] { override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = { println(s"$key -- ${window.getEnd}") out.collect(input.head._1) } } Each window result is simply the first element from the iterable and when the window is processed it prints the key with the end time of the window. If we set the parallelism to 8 as above, it does nothing. If we decrease the parallelism to 4, it only emits results from the first window. You can run the above code and test it yourself. |
Free forum by Nabble | Edit this page |