http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Record-has-Long-MIN-VALUE-timestamp-no-timestamp-marker-Is-the-time-characteristic-set-to-Processing-tp7843p7869.html
runtime exception error. Is
timestamp marker). Is the time characteristic set to 'ProcessingTime',
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> Hi David,
>
> You are using Tumbling event time windows, but you set the
> timeCharacteristic to processing time.
> If you want processing time, then you should use
> TumblingProcessingTimeWindows and remove the timestampAssigner.
> If you want event time, then you need to set the timeCharacteristic to
> eventTime and leave the rest of your code as is.
>
> Let me know if this answered your question.
>
> Cheers,
> Kostas
>
>> On Jul 6, 2016, at 3:43 PM, David Olsen <
[hidden email]> wrote:
>>
>> I have two streams. One will produce a single record, and the other
>> have a list of records. And I want to do left join. So for example,
>>
>> Stream A:
>> record1
>> record2
>> ...
>>
>> Stream B:
>> single-record
>>
>> After joined,
>>
>> record1, single-record
>> record2, single-record
>> ...
>>
>> However with the following streaming job, it throws an exception
>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
>> setStreamTimeCharacteristic is configured to ProcessingTime and
>> assignTimestampsAndWatermarks is called.
>>
>> How can I fix this runtime exception?
>>
>> Thanks.
>>
>> object App {
>> def main(args: Array[String]) {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>> val left = env.fromElements(1, 2, 3, 4, 5).map(
>> new MapFunction[Int, T2[Int, String]] {
>> override def map(value: Int): T2[Int, String] =
>> new T2[Int, String](value, "data 1")
>> }
>> ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>>
>> val right = env.fromElements(99).map(
>> new MapFunction[Int, T2[Int, String]] {
>> override def map(value: Int): T2[Int, String] =
>> new T2[Int, String](value, "data 2")
>> }
>> )
>> left.coGroup(right).
>> where { t2 => t2.f0 }.
>> equalTo{ t2=> t2.f0 }.
>> window(TumblingEventTimeWindows.of(Time.seconds(1))).
>> apply(new Join()).print
>> env.execute
>> }
>> }
>>
>> class MyTimestampExtractor extends
>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>> override def extractTimestamp(e: T2[Int, String],
>> prevElementTimestamp: Long) =
>> System.currentTimeMillis
>>
>> override def getCurrentWatermark(): Watermark =
>> new Watermark(System.currentTimeMillis)
>> }
>>
>> class Join extends CoGroupFunction[
>> T2[Int, String], T2[Int, String], T2[Int, String]
>> ] {
>> val log = LoggerFactory.getLogger(classOf[Join])
>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>> right: java.lang.Iterable[T2[Int, String]],
>> out: Collector[T2[Int, String]]) {
>> var seq = Seq.empty[T2[Int, String]]
>> left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
>> right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
>> seq.foreach { e => out.collect(e) }
>> }
>>
>> }
>
>