Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

David Olsen
I have two streams. One will produce a single record, and the other
have a list of records. And I want to do left join. So for example,

Stream A:
record1
record2
...

Stream B:
single-record

After joined,

record1, single-record
record2, single-record
...

However with the following streaming job, it throws an exception
'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
setStreamTimeCharacteristic is configured to ProcessingTime and
assignTimestampsAndWatermarks is called.

How can I fix this runtime exception?

Thanks.

object App {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    val left = env.fromElements(1, 2, 3, 4, 5).map(
      new MapFunction[Int, T2[Int, String]] {
        override def map(value: Int): T2[Int, String] =
          new T2[Int, String](value, "data 1")
      }
    ).assignTimestampsAndWatermarks(new MyTimestampExtractor)

    val right = env.fromElements(99).map(
      new MapFunction[Int, T2[Int, String]] {
        override def map(value: Int): T2[Int, String] =
          new T2[Int, String](value, "data 2")
      }
    )
    left.coGroup(right).
         where { t2 => t2.f0 }.
         equalTo{ t2=> t2.f0 }.
         window(TumblingEventTimeWindows.of(Time.seconds(1))).
         apply(new Join()).print
    env.execute
  }
}

class MyTimestampExtractor extends
AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
  override def extractTimestamp(e: T2[Int, String],
                                prevElementTimestamp: Long) =
    System.currentTimeMillis

  override def getCurrentWatermark(): Watermark =
    new Watermark(System.currentTimeMillis)
}

class Join extends CoGroupFunction[
  T2[Int, String], T2[Int, String], T2[Int, String]
] {
  val log = LoggerFactory.getLogger(classOf[Join])
  override def coGroup(left: java.lang.Iterable[T2[Int, String]],
                       right: java.lang.Iterable[T2[Int, String]],
                       out: Collector[T2[Int, String]]) {
    var seq = Seq.empty[T2[Int, String]]
    left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
    right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
    seq.foreach { e => out.collect(e) }
  }

}
Reply | Threaded
Open this post in threaded view
|

Re: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

Kostas Kloudas
Hi David,

You are using Tumbling event time windows, but you set the timeCharacteristic to processing time.
If you want processing time, then you should use TumblingProcessingTimeWindows and remove the timestampAssigner.
If you want event time, then you need to set the timeCharacteristic to eventTime and leave the rest of your code as is. 

Let me know if this answered your question.

Cheers,
Kostas

On Jul 6, 2016, at 3:43 PM, David Olsen <[hidden email]> wrote:

I have two streams. One will produce a single record, and the other
have a list of records. And I want to do left join. So for example,

Stream A:
record1
record2
...

Stream B:
single-record

After joined,

record1, single-record
record2, single-record
...

However with the following streaming job, it throws an exception
'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
setStreamTimeCharacteristic is configured to ProcessingTime and
assignTimestampsAndWatermarks is called.

How can I fix this runtime exception?

Thanks.

object App {
 def main(args: Array[String]) {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
   val left = env.fromElements(1, 2, 3, 4, 5).map(
     new MapFunction[Int, T2[Int, String]] {
       override def map(value: Int): T2[Int, String] =
         new T2[Int, String](value, "data 1")
     }
   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)

   val right = env.fromElements(99).map(
     new MapFunction[Int, T2[Int, String]] {
       override def map(value: Int): T2[Int, String] =
         new T2[Int, String](value, "data 2")
     }
   )
   left.coGroup(right).
        where { t2 => t2.f0 }.
        equalTo{ t2=> t2.f0 }.
        window(TumblingEventTimeWindows.of(Time.seconds(1))).
        apply(new Join()).print
   env.execute
 }
}

class MyTimestampExtractor extends
AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
 override def extractTimestamp(e: T2[Int, String],
                               prevElementTimestamp: Long) =
   System.currentTimeMillis

 override def getCurrentWatermark(): Watermark =
   new Watermark(System.currentTimeMillis)
}

class Join extends CoGroupFunction[
 T2[Int, String], T2[Int, String], T2[Int, String]
] {
 val log = LoggerFactory.getLogger(classOf[Join])
 override def coGroup(left: java.lang.Iterable[T2[Int, String]],
                      right: java.lang.Iterable[T2[Int, String]],
                      out: Collector[T2[Int, String]]) {
   var seq = Seq.empty[T2[Int, String]]
   left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
   right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
   seq.foreach { e => out.collect(e) }
 }

}

Reply | Threaded
Open this post in threaded view
|

Re: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

David Olsen
Changing TimeCharacteristic to EventTime the flink still throws that
runtime exception error. Is
`env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
correct way to set that feature?

Thanks.

java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
timestamp marker). Is the time characteristic set to 'ProcessingTime',
or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?
        at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


On 06/07/2016, Kostas Kloudas <[hidden email]> wrote:

> Hi David,
>
> You are using Tumbling event time windows, but you set the
> timeCharacteristic to processing time.
> If you want processing time, then you should use
> TumblingProcessingTimeWindows and remove the timestampAssigner.
> If you want event time, then you need to set the timeCharacteristic to
> eventTime and leave the rest of your code as is.
>
> Let me know if this answered your question.
>
> Cheers,
> Kostas
>
>> On Jul 6, 2016, at 3:43 PM, David Olsen <[hidden email]> wrote:
>>
>> I have two streams. One will produce a single record, and the other
>> have a list of records. And I want to do left join. So for example,
>>
>> Stream A:
>> record1
>> record2
>> ...
>>
>> Stream B:
>> single-record
>>
>> After joined,
>>
>> record1, single-record
>> record2, single-record
>> ...
>>
>> However with the following streaming job, it throws an exception
>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
>> setStreamTimeCharacteristic is configured to ProcessingTime and
>> assignTimestampsAndWatermarks is called.
>>
>> How can I fix this runtime exception?
>>
>> Thanks.
>>
>> object App {
>>  def main(args: Array[String]) {
>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>    val left = env.fromElements(1, 2, 3, 4, 5).map(
>>      new MapFunction[Int, T2[Int, String]] {
>>        override def map(value: Int): T2[Int, String] =
>>          new T2[Int, String](value, "data 1")
>>      }
>>    ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>>
>>    val right = env.fromElements(99).map(
>>      new MapFunction[Int, T2[Int, String]] {
>>        override def map(value: Int): T2[Int, String] =
>>          new T2[Int, String](value, "data 2")
>>      }
>>    )
>>    left.coGroup(right).
>>         where { t2 => t2.f0 }.
>>         equalTo{ t2=> t2.f0 }.
>>         window(TumblingEventTimeWindows.of(Time.seconds(1))).
>>         apply(new Join()).print
>>    env.execute
>>  }
>> }
>>
>> class MyTimestampExtractor extends
>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>>  override def extractTimestamp(e: T2[Int, String],
>>                                prevElementTimestamp: Long) =
>>    System.currentTimeMillis
>>
>>  override def getCurrentWatermark(): Watermark =
>>    new Watermark(System.currentTimeMillis)
>> }
>>
>> class Join extends CoGroupFunction[
>>  T2[Int, String], T2[Int, String], T2[Int, String]
>> ] {
>>  val log = LoggerFactory.getLogger(classOf[Join])
>>  override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>>                       right: java.lang.Iterable[T2[Int, String]],
>>                       out: Collector[T2[Int, String]]) {
>>    var seq = Seq.empty[T2[Int, String]]
>>    left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
>>    right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
>>    seq.foreach { e => out.collect(e) }
>>  }
>>
>> }
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

Kostas Kloudas
Can it be that when you define the ‘right’ steam, you do not specify a timestamp extractor?
This is done the same way you do it for the ‘left’ stream.

Kostas

> On Jul 8, 2016, at 6:12 AM, David Olsen <[hidden email]> wrote:
>
> Changing TimeCharacteristic to EventTime the flink still throws that
> runtime exception error. Is
> `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
> correct way to set that feature?
>
> Thanks.
>
> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). Is the time characteristic set to 'ProcessingTime',
> or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
> at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
>
> On 06/07/2016, Kostas Kloudas <[hidden email]> wrote:
>> Hi David,
>>
>> You are using Tumbling event time windows, but you set the
>> timeCharacteristic to processing time.
>> If you want processing time, then you should use
>> TumblingProcessingTimeWindows and remove the timestampAssigner.
>> If you want event time, then you need to set the timeCharacteristic to
>> eventTime and leave the rest of your code as is.
>>
>> Let me know if this answered your question.
>>
>> Cheers,
>> Kostas
>>
>>> On Jul 6, 2016, at 3:43 PM, David Olsen <[hidden email]> wrote:
>>>
>>> I have two streams. One will produce a single record, and the other
>>> have a list of records. And I want to do left join. So for example,
>>>
>>> Stream A:
>>> record1
>>> record2
>>> ...
>>>
>>> Stream B:
>>> single-record
>>>
>>> After joined,
>>>
>>> record1, single-record
>>> record2, single-record
>>> ...
>>>
>>> However with the following streaming job, it throws an exception
>>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
>>> setStreamTimeCharacteristic is configured to ProcessingTime and
>>> assignTimestampsAndWatermarks is called.
>>>
>>> How can I fix this runtime exception?
>>>
>>> Thanks.
>>>
>>> object App {
>>> def main(args: Array[String]) {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>>   val left = env.fromElements(1, 2, 3, 4, 5).map(
>>>     new MapFunction[Int, T2[Int, String]] {
>>>       override def map(value: Int): T2[Int, String] =
>>>         new T2[Int, String](value, "data 1")
>>>     }
>>>   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>>>
>>>   val right = env.fromElements(99).map(
>>>     new MapFunction[Int, T2[Int, String]] {
>>>       override def map(value: Int): T2[Int, String] =
>>>         new T2[Int, String](value, "data 2")
>>>     }
>>>   )
>>>   left.coGroup(right).
>>>        where { t2 => t2.f0 }.
>>>        equalTo{ t2=> t2.f0 }.
>>>        window(TumblingEventTimeWindows.of(Time.seconds(1))).
>>>        apply(new Join()).print
>>>   env.execute
>>> }
>>> }
>>>
>>> class MyTimestampExtractor extends
>>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>>> override def extractTimestamp(e: T2[Int, String],
>>>                               prevElementTimestamp: Long) =
>>>   System.currentTimeMillis
>>>
>>> override def getCurrentWatermark(): Watermark =
>>>   new Watermark(System.currentTimeMillis)
>>> }
>>>
>>> class Join extends CoGroupFunction[
>>> T2[Int, String], T2[Int, String], T2[Int, String]
>>> ] {
>>> val log = LoggerFactory.getLogger(classOf[Join])
>>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>>>                      right: java.lang.Iterable[T2[Int, String]],
>>>                      out: Collector[T2[Int, String]]) {
>>>   var seq = Seq.empty[T2[Int, String]]
>>>   left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
>>>   right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
>>>   seq.foreach { e => out.collect(e) }
>>> }
>>>
>>> }
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

rmetzger0
One thing I would like to add is that your timestamp extractors are not really extracting the event time from your events. They are just returning the current system time, which effectively means you are falling back to processing time.

On Fri, Jul 8, 2016 at 10:32 AM, Kostas Kloudas <[hidden email]> wrote:
Can it be that when you define the ‘right’ steam, you do not specify a timestamp extractor?
This is done the same way you do it for the ‘left’ stream.

Kostas

> On Jul 8, 2016, at 6:12 AM, David Olsen <[hidden email]> wrote:
>
> Changing TimeCharacteristic to EventTime the flink still throws that
> runtime exception error. Is
> `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
> correct way to set that feature?
>
> Thanks.
>
> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). Is the time characteristic set to 'ProcessingTime',
> or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
>       at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
>       at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
>       at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
>       at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
>
>
> On 06/07/2016, Kostas Kloudas <[hidden email]> wrote:
>> Hi David,
>>
>> You are using Tumbling event time windows, but you set the
>> timeCharacteristic to processing time.
>> If you want processing time, then you should use
>> TumblingProcessingTimeWindows and remove the timestampAssigner.
>> If you want event time, then you need to set the timeCharacteristic to
>> eventTime and leave the rest of your code as is.
>>
>> Let me know if this answered your question.
>>
>> Cheers,
>> Kostas
>>
>>> On Jul 6, 2016, at 3:43 PM, David Olsen <[hidden email]> wrote:
>>>
>>> I have two streams. One will produce a single record, and the other
>>> have a list of records. And I want to do left join. So for example,
>>>
>>> Stream A:
>>> record1
>>> record2
>>> ...
>>>
>>> Stream B:
>>> single-record
>>>
>>> After joined,
>>>
>>> record1, single-record
>>> record2, single-record
>>> ...
>>>
>>> However with the following streaming job, it throws an exception
>>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
>>> setStreamTimeCharacteristic is configured to ProcessingTime and
>>> assignTimestampsAndWatermarks is called.
>>>
>>> How can I fix this runtime exception?
>>>
>>> Thanks.
>>>
>>> object App {
>>> def main(args: Array[String]) {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>>   val left = env.fromElements(1, 2, 3, 4, 5).map(
>>>     new MapFunction[Int, T2[Int, String]] {
>>>       override def map(value: Int): T2[Int, String] =
>>>         new T2[Int, String](value, "data 1")
>>>     }
>>>   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>>>
>>>   val right = env.fromElements(99).map(
>>>     new MapFunction[Int, T2[Int, String]] {
>>>       override def map(value: Int): T2[Int, String] =
>>>         new T2[Int, String](value, "data 2")
>>>     }
>>>   )
>>>   left.coGroup(right).
>>>        where { t2 => t2.f0 }.
>>>        equalTo{ t2=> t2.f0 }.
>>>        window(TumblingEventTimeWindows.of(Time.seconds(1))).
>>>        apply(new Join()).print
>>>   env.execute
>>> }
>>> }
>>>
>>> class MyTimestampExtractor extends
>>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>>> override def extractTimestamp(e: T2[Int, String],
>>>                               prevElementTimestamp: Long) =
>>>   System.currentTimeMillis
>>>
>>> override def getCurrentWatermark(): Watermark =
>>>   new Watermark(System.currentTimeMillis)
>>> }
>>>
>>> class Join extends CoGroupFunction[
>>> T2[Int, String], T2[Int, String], T2[Int, String]
>>> ] {
>>> val log = LoggerFactory.getLogger(classOf[Join])
>>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>>>                      right: java.lang.Iterable[T2[Int, String]],
>>>                      out: Collector[T2[Int, String]]) {
>>>   var seq = Seq.empty[T2[Int, String]]
>>>   left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
>>>   right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
>>>   seq.foreach { e => out.collect(e) }
>>> }
>>>
>>> }
>>
>>


Reply | Threaded
Open this post in threaded view
|

Re: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

Kostas Kloudas
Yes Robert is right!

Although it is set only once and not per-operator, 
so it looks closer to ingestion time, which is when an
operator enters the pipeline.

Setting the timeCharacteristic to ingestion time 
could also be an option, if this is what you want to do.

Kostas

On Jul 8, 2016, at 11:56 AM, Robert Metzger <[hidden email]> wrote:

One thing I would like to add is that your timestamp extractors are not really extracting the event time from your events. They are just returning the current system time, which effectively means you are falling back to processing time.

On Fri, Jul 8, 2016 at 10:32 AM, Kostas Kloudas <[hidden email]> wrote:
Can it be that when you define the ‘right’ steam, you do not specify a timestamp extractor?
This is done the same way you do it for the ‘left’ stream.

Kostas

> On Jul 8, 2016, at 6:12 AM, David Olsen <[hidden email]> wrote:
>
> Changing TimeCharacteristic to EventTime the flink still throws that
> runtime exception error. Is
> `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
> correct way to set that feature?
>
> Thanks.
>
> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). Is the time characteristic set to 'ProcessingTime',
> or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
>       at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
>       at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
>       at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
>       at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
>
>
> On 06/07/2016, Kostas Kloudas <[hidden email]> wrote:
>> Hi David,
>>
>> You are using Tumbling event time windows, but you set the
>> timeCharacteristic to processing time.
>> If you want processing time, then you should use
>> TumblingProcessingTimeWindows and remove the timestampAssigner.
>> If you want event time, then you need to set the timeCharacteristic to
>> eventTime and leave the rest of your code as is.
>>
>> Let me know if this answered your question.
>>
>> Cheers,
>> Kostas
>>
>>> On Jul 6, 2016, at 3:43 PM, David Olsen <[hidden email]> wrote:
>>>
>>> I have two streams. One will produce a single record, and the other
>>> have a list of records. And I want to do left join. So for example,
>>>
>>> Stream A:
>>> record1
>>> record2
>>> ...
>>>
>>> Stream B:
>>> single-record
>>>
>>> After joined,
>>>
>>> record1, single-record
>>> record2, single-record
>>> ...
>>>
>>> However with the following streaming job, it throws an exception
>>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
>>> setStreamTimeCharacteristic is configured to ProcessingTime and
>>> assignTimestampsAndWatermarks is called.
>>>
>>> How can I fix this runtime exception?
>>>
>>> Thanks.
>>>
>>> object App {
>>> def main(args: Array[String]) {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>>   val left = env.fromElements(1, 2, 3, 4, 5).map(
>>>     new MapFunction[Int, T2[Int, String]] {
>>>       override def map(value: Int): T2[Int, String] =
>>>         new T2[Int, String](value, "data 1")
>>>     }
>>>   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>>>
>>>   val right = env.fromElements(99).map(
>>>     new MapFunction[Int, T2[Int, String]] {
>>>       override def map(value: Int): T2[Int, String] =
>>>         new T2[Int, String](value, "data 2")
>>>     }
>>>   )
>>>   left.coGroup(right).
>>>        where { t2 => t2.f0 }.
>>>        equalTo{ t2=> t2.f0 }.
>>>        window(TumblingEventTimeWindows.of(Time.seconds(1))).
>>>        apply(new Join()).print
>>>   env.execute
>>> }
>>> }
>>>
>>> class MyTimestampExtractor extends
>>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>>> override def extractTimestamp(e: T2[Int, String],
>>>                               prevElementTimestamp: Long) =
>>>   System.currentTimeMillis
>>>
>>> override def getCurrentWatermark(): Watermark =
>>>   new Watermark(System.currentTimeMillis)
>>> }
>>>
>>> class Join extends CoGroupFunction[
>>> T2[Int, String], T2[Int, String], T2[Int, String]
>>> ] {
>>> val log = LoggerFactory.getLogger(classOf[Join])
>>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>>>                      right: java.lang.Iterable[T2[Int, String]],
>>>                      out: Collector[T2[Int, String]]) {
>>>   var seq = Seq.empty[T2[Int, String]]
>>>   left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
>>>   right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
>>>   seq.foreach { e => out.collect(e) }
>>> }
>>>
>>> }
>>
>>



Reply | Threaded
Open this post in threaded view
|

Re: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

David Olsen
Changing to env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime),
and removing assignTimestampsAndWatermarks(new MyTimestampExtractor)
get the code executing now.

One more question. I read the java doc[1] it seems watermark is a mark
telling operators that no more elements will arrive. So how do I
determine the value of watermark e.g.
MyTimestampExtractor.getCurrentWatermark()? Is it normal to simply
keep track of the timestamp which is the oldest one? Any concrete
examples I can check (I guess because I do not have streaming concepts
so to have such newbie question)?

Many thanks for kindly reply.

[1]. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/watermark/Watermark.html

On 08/07/2016, Kostas Kloudas <[hidden email]> wrote:

> Yes Robert is right!
>
> Although it is set only once and not per-operator,
> so it looks closer to ingestion time, which is when an
> operator enters the pipeline.
>
> Setting the timeCharacteristic to ingestion time
> could also be an option, if this is what you want to do.
>
> Kostas
>
>> On Jul 8, 2016, at 11:56 AM, Robert Metzger <[hidden email]> wrote:
>>
>> One thing I would like to add is that your timestamp extractors are not
>> really extracting the event time from your events. They are just returning
>> the current system time, which effectively means you are falling back to
>> processing time.
>>
>> On Fri, Jul 8, 2016 at 10:32 AM, Kostas Kloudas
>> <[hidden email] <mailto:[hidden email]>> wrote:
>> Can it be that when you define the ‘right’ steam, you do not specify a
>> timestamp extractor?
>> This is done the same way you do it for the ‘left’ stream.
>>
>> Kostas
>>
>> > On Jul 8, 2016, at 6:12 AM, David Olsen <[hidden email]
>> > <mailto:[hidden email]>> wrote:
>> >
>> > Changing TimeCharacteristic to EventTime the flink still throws that
>> > runtime exception error. Is
>> > `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
>> > correct way to set that feature?
>> >
>> > Thanks.
>> >
>> > java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>> > timestamp marker). Is the time characteristic set to 'ProcessingTime',
>> > or did you forget to call
>> > 'DataStream.assignTimestampsAndWatermarks(...)'?
>> >       at
>> > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
>> >       at
>> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
>> >       at
>> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
>> >       at
>> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> >       at
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> >       at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>> > On 06/07/2016, Kostas Kloudas <[hidden email]
>> > <mailto:[hidden email]>> wrote:
>> >> Hi David,
>> >>
>> >> You are using Tumbling event time windows, but you set the
>> >> timeCharacteristic to processing time.
>> >> If you want processing time, then you should use
>> >> TumblingProcessingTimeWindows and remove the timestampAssigner.
>> >> If you want event time, then you need to set the timeCharacteristic to
>> >> eventTime and leave the rest of your code as is.
>> >>
>> >> Let me know if this answered your question.
>> >>
>> >> Cheers,
>> >> Kostas
>> >>
>> >>> On Jul 6, 2016, at 3:43 PM, David Olsen <[hidden email]
>> >>> <mailto:[hidden email]>> wrote:
>> >>>
>> >>> I have two streams. One will produce a single record, and the other
>> >>> have a list of records. And I want to do left join. So for example,
>> >>>
>> >>> Stream A:
>> >>> record1
>> >>> record2
>> >>> ...
>> >>>
>> >>> Stream B:
>> >>> single-record
>> >>>
>> >>> After joined,
>> >>>
>> >>> record1, single-record
>> >>> record2, single-record
>> >>> ...
>> >>>
>> >>> However with the following streaming job, it throws an exception
>> >>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...'
>> >>> even
>> >>> setStreamTimeCharacteristic is configured to ProcessingTime and
>> >>> assignTimestampsAndWatermarks is called.
>> >>>
>> >>> How can I fix this runtime exception?
>> >>>
>> >>> Thanks.
>> >>>
>> >>> object App {
>> >>> def main(args: Array[String]) {
>> >>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>> >>>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>> >>>   val left = env.fromElements(1, 2, 3, 4, 5).map(
>> >>>     new MapFunction[Int, T2[Int, String]] {
>> >>>       override def map(value: Int): T2[Int, String] =
>> >>>         new T2[Int, String](value, "data 1")
>> >>>     }
>> >>>   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>> >>>
>> >>>   val right = env.fromElements(99).map(
>> >>>     new MapFunction[Int, T2[Int, String]] {
>> >>>       override def map(value: Int): T2[Int, String] =
>> >>>         new T2[Int, String](value, "data 2")
>> >>>     }
>> >>>   )
>> >>>   left.coGroup(right).
>> >>>        where { t2 => t2.f0 }.
>> >>>        equalTo{ t2=> t2.f0 }.
>> >>>        window(TumblingEventTimeWindows.of(Time.seconds(1))).
>> >>>        apply(new Join()).print
>> >>>   env.execute
>> >>> }
>> >>> }
>> >>>
>> >>> class MyTimestampExtractor extends
>> >>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>> >>> override def extractTimestamp(e: T2[Int, String],
>> >>>                               prevElementTimestamp: Long) =
>> >>>   System.currentTimeMillis
>> >>>
>> >>> override def getCurrentWatermark(): Watermark =
>> >>>   new Watermark(System.currentTimeMillis)
>> >>> }
>> >>>
>> >>> class Join extends CoGroupFunction[
>> >>> T2[Int, String], T2[Int, String], T2[Int, String]
>> >>> ] {
>> >>> val log = LoggerFactory.getLogger(classOf[Join])
>> >>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>> >>>                      right: java.lang.Iterable[T2[Int, String]],
>> >>>                      out: Collector[T2[Int, String]]) {
>> >>>   var seq = Seq.empty[T2[Int, String]]
>> >>>   left.foreach { e => log.info <http://log.info/>(s"from left: $e");
>> >>> seq ++= Seq(e) }
>> >>>   right.foreach { e => log.info <http://log.info/>(s"from right: $e");
>> >>> seq ++= Seq(e) }
>> >>>   seq.foreach { e => out.collect(e) }
>> >>> }
>> >>>
>> >>> }
>> >>
>> >>
>>
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

Stephan Ewen
Hi David!

Have you had a look at the docs for Event Time and Watermark Generation?

There are some examples for some typical cases:





Greetings,
Stephan



On Sun, Jul 10, 2016 at 9:43 AM, David Olsen <[hidden email]> wrote:
Changing to env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime),
and removing assignTimestampsAndWatermarks(new MyTimestampExtractor)
get the code executing now.

One more question. I read the java doc[1] it seems watermark is a mark
telling operators that no more elements will arrive. So how do I
determine the value of watermark e.g.
MyTimestampExtractor.getCurrentWatermark()? Is it normal to simply
keep track of the timestamp which is the oldest one? Any concrete
examples I can check (I guess because I do not have streaming concepts
so to have such newbie question)?

Many thanks for kindly reply.

[1]. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/watermark/Watermark.html

On 08/07/2016, Kostas Kloudas <[hidden email]> wrote:
> Yes Robert is right!
>
> Although it is set only once and not per-operator,
> so it looks closer to ingestion time, which is when an
> operator enters the pipeline.
>
> Setting the timeCharacteristic to ingestion time
> could also be an option, if this is what you want to do.
>
> Kostas
>
>> On Jul 8, 2016, at 11:56 AM, Robert Metzger <[hidden email]> wrote:
>>
>> One thing I would like to add is that your timestamp extractors are not
>> really extracting the event time from your events. They are just returning
>> the current system time, which effectively means you are falling back to
>> processing time.
>>
>> On Fri, Jul 8, 2016 at 10:32 AM, Kostas Kloudas
>> <[hidden email] <mailto:[hidden email]>> wrote:
>> Can it be that when you define the ‘right’ steam, you do not specify a
>> timestamp extractor?
>> This is done the same way you do it for the ‘left’ stream.
>>
>> Kostas
>>
>> > On Jul 8, 2016, at 6:12 AM, David Olsen <[hidden email]
>> > <mailto:[hidden email]>> wrote:
>> >
>> > Changing TimeCharacteristic to EventTime the flink still throws that
>> > runtime exception error. Is
>> > `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
>> > correct way to set that feature?
>> >
>> > Thanks.
>> >
>> > java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>> > timestamp marker). Is the time characteristic set to 'ProcessingTime',
>> > or did you forget to call
>> > 'DataStream.assignTimestampsAndWatermarks(...)'?
>> >       at
>> > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
>> >       at
>> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
>> >       at
>> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
>> >       at
>> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> >       at
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> >       at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>> > On 06/07/2016, Kostas Kloudas <[hidden email]
>> > <mailto:[hidden email]>> wrote:
>> >> Hi David,
>> >>
>> >> You are using Tumbling event time windows, but you set the
>> >> timeCharacteristic to processing time.
>> >> If you want processing time, then you should use
>> >> TumblingProcessingTimeWindows and remove the timestampAssigner.
>> >> If you want event time, then you need to set the timeCharacteristic to
>> >> eventTime and leave the rest of your code as is.
>> >>
>> >> Let me know if this answered your question.
>> >>
>> >> Cheers,
>> >> Kostas
>> >>
>> >>> On Jul 6, 2016, at 3:43 PM, David Olsen <[hidden email]
>> >>> <mailto:[hidden email]>> wrote:
>> >>>
>> >>> I have two streams. One will produce a single record, and the other
>> >>> have a list of records. And I want to do left join. So for example,
>> >>>
>> >>> Stream A:
>> >>> record1
>> >>> record2
>> >>> ...
>> >>>
>> >>> Stream B:
>> >>> single-record
>> >>>
>> >>> After joined,
>> >>>
>> >>> record1, single-record
>> >>> record2, single-record
>> >>> ...
>> >>>
>> >>> However with the following streaming job, it throws an exception
>> >>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...'
>> >>> even
>> >>> setStreamTimeCharacteristic is configured to ProcessingTime and
>> >>> assignTimestampsAndWatermarks is called.
>> >>>
>> >>> How can I fix this runtime exception?
>> >>>
>> >>> Thanks.
>> >>>
>> >>> object App {
>> >>> def main(args: Array[String]) {
>> >>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>> >>>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>> >>>   val left = env.fromElements(1, 2, 3, 4, 5).map(
>> >>>     new MapFunction[Int, T2[Int, String]] {
>> >>>       override def map(value: Int): T2[Int, String] =
>> >>>         new T2[Int, String](value, "data 1")
>> >>>     }
>> >>>   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>> >>>
>> >>>   val right = env.fromElements(99).map(
>> >>>     new MapFunction[Int, T2[Int, String]] {
>> >>>       override def map(value: Int): T2[Int, String] =
>> >>>         new T2[Int, String](value, "data 2")
>> >>>     }
>> >>>   )
>> >>>   left.coGroup(right).
>> >>>        where { t2 => t2.f0 }.
>> >>>        equalTo{ t2=> t2.f0 }.
>> >>>        window(TumblingEventTimeWindows.of(Time.seconds(1))).
>> >>>        apply(new Join()).print
>> >>>   env.execute
>> >>> }
>> >>> }
>> >>>
>> >>> class MyTimestampExtractor extends
>> >>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>> >>> override def extractTimestamp(e: T2[Int, String],
>> >>>                               prevElementTimestamp: Long) =
>> >>>   System.currentTimeMillis
>> >>>
>> >>> override def getCurrentWatermark(): Watermark =
>> >>>   new Watermark(System.currentTimeMillis)
>> >>> }
>> >>>
>> >>> class Join extends CoGroupFunction[
>> >>> T2[Int, String], T2[Int, String], T2[Int, String]
>> >>> ] {
>> >>> val log = LoggerFactory.getLogger(classOf[Join])
>> >>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>> >>>                      right: java.lang.Iterable[T2[Int, String]],
>> >>>                      out: Collector[T2[Int, String]]) {
>> >>>   var seq = Seq.empty[T2[Int, String]]
>> >>>   left.foreach { e => log.info <http://log.info/>(s"from left: $e");
>> >>> seq ++= Seq(e) }
>> >>>   right.foreach { e => log.info <http://log.info/>(s"from right: $e");
>> >>> seq ++= Seq(e) }
>> >>>   seq.foreach { e => out.collect(e) }
>> >>> }
>> >>>
>> >>> }
>> >>
>> >>
>>
>>
>
>