Flink 1.1 event-time windowing changes from 1.0.3

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.1 event-time windowing changes from 1.0.3

Adam Warski
Hello,

I have a very simple stream where I window data using event-time.
As a data source I’m using a CSV file, sorted by increasing timestamps.

Here’s the source:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val lines = env.readTextFile(csvFileName)

lines
  .flatMap { l => parseLine(l) }
  .assignAscendingTimestamps(t => t.timestampSeconds * 1000L)
  .keyBy(t => t.key)
  .timeWindow(Time.minutes(30), Time.minutes(5))
  .fold(0)((c, _) => c+1)
  .addSink { c =>
    println(c)
  }

env.execute()

This used to work fine in 1.0.3, that is the aggregate counts are printed to stdout.

However after updating to 1.1, nothing happens - I can see the stages being initialized (switching state from SCHEDULED to DEPLOYING to RUNNING), but then immediately going to FINISHED, without printing anything out.

If I add a .map {x => println(x); x} after .assignAscendingTimestamps I can see the data flowing - so data *is* being read, just somehow the windowing causes it to be lost?

Any ideas on where to look for possible causes?

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.1 event-time windowing changes from 1.0.3

Aljoscha Krettek
Hi Adam,
sorry for the inconvenience. This is caused by a new file read operator, specifically how it treats watermarks/timestamps. I opened an issue here that describes the situation: https://issues.apache.org/jira/browse/FLINK-4329.

I think this should be fixed for an upcoming 1.1.1 bug fixing release.

Cheers,
Aljoscha

On Sat, 6 Aug 2016 at 12:33 Adam Warski <[hidden email]> wrote:
Hello,

I have a very simple stream where I window data using event-time.
As a data source I’m using a CSV file, sorted by increasing timestamps.

Here’s the source:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val lines = env.readTextFile(csvFileName)

lines
  .flatMap { l => parseLine(l) }
  .assignAscendingTimestamps(t => t.timestampSeconds * 1000L)
  .keyBy(t => t.key)
  .timeWindow(Time.minutes(30), Time.minutes(5))
  .fold(0)((c, _) => c+1)
  .addSink { c =>
    println(c)
  }

env.execute()

This used to work fine in 1.0.3, that is the aggregate counts are printed to stdout.

However after updating to 1.1, nothing happens - I can see the stages being initialized (switching state from SCHEDULED to DEPLOYING to RUNNING), but then immediately going to FINISHED, without printing anything out.

If I add a .map {x => println(x); x} after .assignAscendingTimestamps I can see the data flowing - so data *is* being read, just somehow the windowing causes it to be lost?

Any ideas on where to look for possible causes?

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.1 event-time windowing changes from 1.0.3

Adam Warski
Thanks! I’ll be watching that issue then

Adam

On 08 Aug 2016, at 05:01, Aljoscha Krettek <[hidden email]> wrote:

Hi Adam,
sorry for the inconvenience. This is caused by a new file read operator, specifically how it treats watermarks/timestamps. I opened an issue here that describes the situation: https://issues.apache.org/jira/browse/FLINK-4329.

I think this should be fixed for an upcoming 1.1.1 bug fixing release.

Cheers,
Aljoscha

On Sat, 6 Aug 2016 at 12:33 Adam Warski <[hidden email]> wrote:
Hello,

I have a very simple stream where I window data using event-time.
As a data source I’m using a CSV file, sorted by increasing timestamps.

Here’s the source:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val lines = env.readTextFile(csvFileName)

lines
  .flatMap { l => parseLine(l) }
  .assignAscendingTimestamps(t => t.timestampSeconds * 1000L)
  .keyBy(t => t.key)
  .timeWindow(Time.minutes(30), Time.minutes(5))
  .fold(0)((c, _) => c+1)
  .addSink { c =>
    println(c)
  }

env.execute()

This used to work fine in 1.0.3, that is the aggregate counts are printed to stdout.

However after updating to 1.1, nothing happens - I can see the stages being initialized (switching state from SCHEDULED to DEPLOYING to RUNNING), but then immediately going to FINISHED, without printing anything out.

If I add a .map {x => println(x); x} after .assignAscendingTimestamps I can see the data flowing - so data *is* being read, just somehow the windowing causes it to be lost?

Any ideas on where to look for possible causes?

Thanks!