Re: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

Posted by Kostas Kloudas on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Record-has-Long-MIN-VALUE-timestamp-no-timestamp-marker-Is-the-time-characteristic-set-to-Processing-tp7843p7846.html

Hi David,

You are using Tumbling event time windows, but you set the timeCharacteristic to processing time.
If you want processing time, then you should use TumblingProcessingTimeWindows and remove the timestampAssigner.
If you want event time, then you need to set the timeCharacteristic to eventTime and leave the rest of your code as is. 

Let me know if this answered your question.

Cheers,
Kostas

On Jul 6, 2016, at 3:43 PM, David Olsen <[hidden email]> wrote:

I have two streams. One will produce a single record, and the other
have a list of records. And I want to do left join. So for example,

Stream A:
record1
record2
...

Stream B:
single-record

After joined,

record1, single-record
record2, single-record
...

However with the following streaming job, it throws an exception
'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
setStreamTimeCharacteristic is configured to ProcessingTime and
assignTimestampsAndWatermarks is called.

How can I fix this runtime exception?

Thanks.

object App {
 def main(args: Array[String]) {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
   val left = env.fromElements(1, 2, 3, 4, 5).map(
     new MapFunction[Int, T2[Int, String]] {
       override def map(value: Int): T2[Int, String] =
         new T2[Int, String](value, "data 1")
     }
   ).assignTimestampsAndWatermarks(new MyTimestampExtractor)

   val right = env.fromElements(99).map(
     new MapFunction[Int, T2[Int, String]] {
       override def map(value: Int): T2[Int, String] =
         new T2[Int, String](value, "data 2")
     }
   )
   left.coGroup(right).
        where { t2 => t2.f0 }.
        equalTo{ t2=> t2.f0 }.
        window(TumblingEventTimeWindows.of(Time.seconds(1))).
        apply(new Join()).print
   env.execute
 }
}

class MyTimestampExtractor extends
AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
 override def extractTimestamp(e: T2[Int, String],
                               prevElementTimestamp: Long) =
   System.currentTimeMillis

 override def getCurrentWatermark(): Watermark =
   new Watermark(System.currentTimeMillis)
}

class Join extends CoGroupFunction[
 T2[Int, String], T2[Int, String], T2[Int, String]
] {
 val log = LoggerFactory.getLogger(classOf[Join])
 override def coGroup(left: java.lang.Iterable[T2[Int, String]],
                      right: java.lang.Iterable[T2[Int, String]],
                      out: Collector[T2[Int, String]]) {
   var seq = Seq.empty[T2[Int, String]]
   left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
   right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
   seq.foreach { e => out.collect(e) }
 }

}