Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
Posted by David Olsen on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Record-has-Long-MIN-VALUE-timestamp-no-timestamp-marker-Is-the-time-characteristic-set-to-Processing-tp7843.html
I have two streams. One will produce a single record, and the other
have a list of records. And I want to do left join. So for example,
Stream A:
record1
record2
...
Stream B:
single-record
After joined,
record1, single-record
record2, single-record
...
However with the following streaming job, it throws an exception
'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
setStreamTimeCharacteristic is configured to ProcessingTime and
assignTimestampsAndWatermarks is called.
How can I fix this runtime exception?
Thanks.
object App {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val left = env.fromElements(1, 2, 3, 4, 5).map(
new MapFunction[Int, T2[Int, String]] {
override def map(value: Int): T2[Int, String] =
new T2[Int, String](value, "data 1")
}
).assignTimestampsAndWatermarks(new MyTimestampExtractor)
val right = env.fromElements(99).map(
new MapFunction[Int, T2[Int, String]] {
override def map(value: Int): T2[Int, String] =
new T2[Int, String](value, "data 2")
}
)
left.coGroup(right).
where { t2 => t2.f0 }.
equalTo{ t2=> t2.f0 }.
window(TumblingEventTimeWindows.of(Time.seconds(1))).
apply(new Join()).print
env.execute
}
}
class MyTimestampExtractor extends
AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
override def extractTimestamp(e: T2[Int, String],
prevElementTimestamp: Long) =
System.currentTimeMillis
override def getCurrentWatermark(): Watermark =
new Watermark(System.currentTimeMillis)
}
class Join extends CoGroupFunction[
T2[Int, String], T2[Int, String], T2[Int, String]
] {
val log = LoggerFactory.getLogger(classOf[Join])
override def coGroup(left: java.lang.Iterable[T2[Int, String]],
right: java.lang.Iterable[T2[Int, String]],
out: Collector[T2[Int, String]]) {
var seq = Seq.empty[T2[Int, String]]
left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
seq.foreach { e => out.collect(e) }
}
}