Windows emit results at the end of the stream

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Windows emit results at the end of the stream

Sonex
Hi everyone,

I am using a simple window computation on a stream with event time. The code looks like this:

streamData.readTextFile(...)
    .map(...)
    .assignAscendingTimestamps(_.timestamp)
    .keyBy(_.id)
    .timeWindow(Time.seconds(3600),Time.seconds(3600))
    .apply(new MyWindowFunction)
    .map(...)

By monitoring the memory usage and the flink web dashboard, I noticed that flink applies the window function until the entire stream finishes (thus storing all aggregations in memory) and then continues to the map transformation. What I would expect is emission of window results to the map transformation as soon as results of the window are ready.

Can anyone explain this behavior?
Reply | Threaded
Open this post in threaded view
|

Re: Windows emit results at the end of the stream

Yassine MARZOUGUI
Hi Sonex,

When using readTextFile(...) with event time, only one watermark with the value Long.MAX_VALUE is sent at the end of the stream, which explais why the windows are stored until the whole file is processed. In order to have periodic watermarks, you need to process the file continuousely as folows:
TextInputFormat inputFormat = new TextInputFormat(new Path("file/to/read.txt"));
env.readFile(inputFormat,"file/to/read.txt", FileProcessingMode.PROCESS_CONTINUOUSLY,10000L, TypeInformation.of(String.class)) .map(...)
Hope this helps.

Best,
Yassine

2017-03-23 9:47 GMT+01:00 Sonex <[hidden email]>:
Hi everyone,

I am using a simple window computation on a stream with event time. The code
looks like this:

streamData.readTextFile(...)
    .map(...)
    .assignAscendingTimestamps(_.timestamp)
    .keyBy(_.id)
    .timeWindow(Time.seconds(3600),Time.seconds(3600))
    .apply(new MyWindowFunction)
    .map(...)

By monitoring the memory usage and the flink web dashboard, I noticed that
flink applies the window function until the entire stream finishes (thus
storing all aggregations in memory) and then continues to the map
transformation. What I would expect is emission of window results to the map
transformation as soon as results of the window are ready.

Can anyone explain this behavior?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Windows emit results at the end of the stream

Sonex
Thank you for your response Yassine,

I forgot to mention that I use the Scala API. In Scala the equivalent code is:

val inputFormat = new TextInputFormat(new Path("file/to/read.txt"))
env.readFile(inputFormat,"file/to/read.txt", FileProcessingMode.PROCESS_CONTINUOUSLY,10000L)

Am I correct?

But I noticed a weird behavior now. Sometimes, it never starts to process the elements of the file and sometimes it stops at the middle of the file without processing the rest of it. Why does that happen?
Reply | Threaded
Open this post in threaded view
|

Re: Windows emit results at the end of the stream

Yassine MARZOUGUI
Hi Sonex,

I don't known well Scala as I know Java, but I guess it should be correct if no error is raised.
The behaviour you described seems wierd to me and should not happen. I'm unfortunately unable to identify an apparent cause, maybe someone in the mailing list can shed a light on that.

Best,
Yassine

2017-03-23 13:16 GMT+01:00 Sonex <[hidden email]>:
Thank you for your response Yassine,

I forgot to mention that I use the Scala API. In Scala the equivalent code
is:

val inputFormat = new TextInputFormat(new Path("file/to/read.txt"))
env.readFile(inputFormat,"file/to/read.txt",
FileProcessingMode.PROCESS_CONTINUOUSLY,10000L)

Am I correct?

But I noticed a weird behavior now. Sometimes, it never starts to process
the elements of the file and sometimes it stops at the middle of the file
without processing the rest of it. Why does that happen?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12356.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Windows emit results at the end of the stream

Till Rohrmann
Hi Sonex,

I assume the elements in your file have a timestamp associated which is parsed in the first map function, right? Now my question would be: What is the range of this timestamp value? In your program you've defined a time window of 1 hour. If the timestamps lie all in a window of 1 hour, then you won't see the triggering of the windows before the complete file has been read.

Cheers,
Till

On Fri, Mar 24, 2017 at 11:23 AM, Yassine MARZOUGUI <[hidden email]> wrote:
Hi Sonex,

I don't known well Scala as I know Java, but I guess it should be correct if no error is raised.
The behaviour you described seems wierd to me and should not happen. I'm unfortunately unable to identify an apparent cause, maybe someone in the mailing list can shed a light on that.

Best,
Yassine

2017-03-23 13:16 GMT+01:00 Sonex <[hidden email]>:
Thank you for your response Yassine,

I forgot to mention that I use the Scala API. In Scala the equivalent code
is:

val inputFormat = new TextInputFormat(new Path("file/to/read.txt"))
env.readFile(inputFormat,"file/to/read.txt",
FileProcessingMode.PROCESS_CONTINUOUSLY,10000L)

Am I correct?

But I noticed a weird behavior now. Sometimes, it never starts to process
the elements of the file and sometimes it stops at the middle of the file
without processing the rest of it. Why does that happen?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12356.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Windows emit results at the end of the stream

Sonex
Hello Till,

Yes elements do have a timestamp associated which is parsed in the first map function.

Yes, indeed if all timestamps lie within 1 hour the triggering will happen after the complete file has been read. I had wrong window size and sliding step for a dataset I was testing (I tested it in different datasets).

Now, since I solved the non-triggering from the beginning, one problem remains in all my tests. Assume that I have a dataset of 50 hours of events/elements. The triggering happens as expected but now when it reaches the 6th hour it stops and do not continue (all parallel operators remain idle).

Thanx,
Sonex
Reply | Threaded
Open this post in threaded view
|

Re: Windows emit results at the end of the stream

Sonex
In reply to this post by Till Rohrmann
The degree of parallelism in the experiments I mentioned is 8. If I decrease the parallelism it emits more results. If I set the parallelism to 1 then it emits results from the entire dataset (i.e., it behaves as expected). What could be the reason of this?
Reply | Threaded
Open this post in threaded view
|

Re: Windows emit results at the end of the stream

Till Rohrmann
Do you mean that window results with timestamps larger than the 6th hour won't be output? Can it be the case that some of the keys simply don't have elements arriving after the 6th hour? It's really hard to tell given no knowledge about your input data set. Maybe you can compile a small example code and data set to reproduce your problem. This would help me to debug the issue.

Cheers,
Till

On Mon, Mar 27, 2017 at 11:51 AM, Sonex <[hidden email]> wrote:
The degree of parallelism in the experiments I mentioned is 8. If I decrease
the parallelism it emits more results. If I set the parallelism to 1 then it
emits results from the entire dataset (i.e., it behaves as expected). What
could be the reason of this?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12410.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Windows emit results at the end of the stream

Sonex
I have prepared a small dummy dataset (data.txt) as follows:

Hello|5
Hi|15
WordsWithoutMeaning|25
AnotherWord|34
HelloWorld|46
HelloPlanet|67
HelloFlinkUsers|89
HelloProgrammers|98
DummyPhrase|105
AnotherDummy|123

And below is the code:

import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

val parallelism = 8
// sliding step (ms)
val slidingStep = 30
// window size (ms)
val windowSize = 30

// start the streaming environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.disableSysoutLogging
    // set the degree of parallelism
    env.setParallelism(parallelism)
    // set the time characteristic
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputFormat = new TextInputFormat(new Path("data.txt"))
    env.readFile(inputFormat,"data.txt", FileProcessingMode.PROCESS_CONTINUOUSLY,10000L)
      .map{
        element =>
          val partsOfElement = element.split("[|]")
          (partsOfElement.head,partsOfElement.last.toLong)
      }.assignAscendingTimestamps(_._2)
      .keyBy(_._1)
      .timeWindow(Time.milliseconds(windowSize),Time.milliseconds(slidingStep))
      .apply(new Test)

    env.execute


And the test class is the following:

class Test extends WindowFunction[(String,Long),String,String,TimeWindow] {
    override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
      println(s"$key -- ${window.getEnd}")
      out.collect(input.head._1)
    }
  }


Each window result is simply the first element from the iterable and when the window is processed it prints the key with the end time of the window. If we set the parallelism to 8 as above, it does nothing. If we decrease the parallelism to 4, it only emits results from the first window. You can run the above code and test it yourself.