Re: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

Posted by Kostas Kloudas on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Record-has-Long-MIN-VALUE-timestamp-no-timestamp-marker-Is-the-time-characteristic-set-to-Processing-tp7843p7871.html

Can it be that when you define the ‘right’ steam, you do not specify a timestamp extractor?
This is done the same way you do it for the ‘left’ stream.

Kostas

> On Jul 8, 2016, at 6:12 AM, David Olsen <[hidden email]> wrote:
>
> Changing TimeCharacteristic to EventTime the flink still throws that
> runtime exception error. Is
> `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
> correct way to set that feature?
>
> Thanks.
>
> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). Is the time characteristic set to 'ProcessingTime',
> or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
> at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
>
> On 06/07/2016, Kostas Kloudas <[hidden email]> wrote:
>> Hi David,
>>
>> You are using Tumbling event time windows, but you set the
>> timeCharacteristic to processing time.
>> If you want processing time, then you should use
>> TumblingProcessingTimeWindows and remove the timestampAssigner.
>> If you want event time, then you need to set the timeCharacteristic to
>> eventTime and leave the rest of your code as is.
>>
>> Let me know if this answered your question.
>>
>> Cheers,
>> Kostas
>>
>>> On Jul 6, 2016, at 3:43 PM, David Olsen <[hidden email]> wrote:
>>>
>>> I have two streams. One will produce a single record, and the other
>>> have a list of records. And I want to do left join. So for example,
>>>
>>> Stream A:
>>> record1
>>> record2
>>> ...
>>>
>>> Stream B:
>>> single-record
>>>
>>> After joined,
>>>
>>> record1, single-record
>>> record2, single-record
>>> ...
>>>
>>> However with the following streaming job, it throws an exception
>>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
>>> setStreamTimeCharacteristic is configured to ProcessingTime and
>>> assignTimestampsAndWatermarks is called.
>>>
>>> How can I fix this runtime exception?
>>>
>>> Thanks.
>>>
>>> object App {
>>> def main(args: Array[String]) {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>>   val left = env.fromElements(1, 2, 3, 4, 5).map(
>>>     new MapFunction[Int, T2[Int, String]] {
>>>       override def map(value: Int): T2[Int, String] =
>>>         new T2[Int, String](value, "data 1")
>>>     }
>>>   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>>>
>>>   val right = env.fromElements(99).map(
>>>     new MapFunction[Int, T2[Int, String]] {
>>>       override def map(value: Int): T2[Int, String] =
>>>         new T2[Int, String](value, "data 2")
>>>     }
>>>   )
>>>   left.coGroup(right).
>>>        where { t2 => t2.f0 }.
>>>        equalTo{ t2=> t2.f0 }.
>>>        window(TumblingEventTimeWindows.of(Time.seconds(1))).
>>>        apply(new Join()).print
>>>   env.execute
>>> }
>>> }
>>>
>>> class MyTimestampExtractor extends
>>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>>> override def extractTimestamp(e: T2[Int, String],
>>>                               prevElementTimestamp: Long) =
>>>   System.currentTimeMillis
>>>
>>> override def getCurrentWatermark(): Watermark =
>>>   new Watermark(System.currentTimeMillis)
>>> }
>>>
>>> class Join extends CoGroupFunction[
>>> T2[Int, String], T2[Int, String], T2[Int, String]
>>> ] {
>>> val log = LoggerFactory.getLogger(classOf[Join])
>>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>>>                      right: java.lang.Iterable[T2[Int, String]],
>>>                      out: Collector[T2[Int, String]]) {
>>>   var seq = Seq.empty[T2[Int, String]]
>>>   left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
>>>   right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
>>>   seq.foreach { e => out.collect(e) }
>>> }
>>>
>>> }
>>
>>