Can it be that when you define the ‘right’ steam, you do not specify a timestamp extractor?
This is done the same way you do it for the ‘left’ stream.
Kostas
> On Jul 8, 2016, at 6:12 AM, David Olsen <[hidden email]> wrote:
>
> Changing TimeCharacteristic to EventTime the flink still throws that
> runtime exception error. Is
> `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the
> correct way to set that feature?
>
> Thanks.
>
> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). Is the time characteristic set to 'ProcessingTime',
> or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
> at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63)
> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
>
> On 06/07/2016, Kostas Kloudas <[hidden email]> wrote:
>> Hi David,
>>
>> You are using Tumbling event time windows, but you set the
>> timeCharacteristic to processing time.
>> If you want processing time, then you should use
>> TumblingProcessingTimeWindows and remove the timestampAssigner.
>> If you want event time, then you need to set the timeCharacteristic to
>> eventTime and leave the rest of your code as is.
>>
>> Let me know if this answered your question.
>>
>> Cheers,
>> Kostas
>>
>>> On Jul 6, 2016, at 3:43 PM, David Olsen <[hidden email]> wrote:
>>>
>>> I have two streams. One will produce a single record, and the other
>>> have a list of records. And I want to do left join. So for example,
>>>
>>> Stream A:
>>> record1
>>> record2
>>> ...
>>>
>>> Stream B:
>>> single-record
>>>
>>> After joined,
>>>
>>> record1, single-record
>>> record2, single-record
>>> ...
>>>
>>> However with the following streaming job, it throws an exception
>>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even
>>> setStreamTimeCharacteristic is configured to ProcessingTime and
>>> assignTimestampsAndWatermarks is called.
>>>
>>> How can I fix this runtime exception?
>>>
>>> Thanks.
>>>
>>> object App {
>>> def main(args: Array[String]) {
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>> val left = env.fromElements(1, 2, 3, 4, 5).map(
>>> new MapFunction[Int, T2[Int, String]] {
>>> override def map(value: Int): T2[Int, String] =
>>> new T2[Int, String](value, "data 1")
>>> }
>>> ).assignTimestampsAndWatermarks(new MyTimestampExtractor)
>>>
>>> val right = env.fromElements(99).map(
>>> new MapFunction[Int, T2[Int, String]] {
>>> override def map(value: Int): T2[Int, String] =
>>> new T2[Int, String](value, "data 2")
>>> }
>>> )
>>> left.coGroup(right).
>>> where { t2 => t2.f0 }.
>>> equalTo{ t2=> t2.f0 }.
>>> window(TumblingEventTimeWindows.of(Time.seconds(1))).
>>> apply(new Join()).print
>>> env.execute
>>> }
>>> }
>>>
>>> class MyTimestampExtractor extends
>>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable {
>>> override def extractTimestamp(e: T2[Int, String],
>>> prevElementTimestamp: Long) =
>>> System.currentTimeMillis
>>>
>>> override def getCurrentWatermark(): Watermark =
>>> new Watermark(System.currentTimeMillis)
>>> }
>>>
>>> class Join extends CoGroupFunction[
>>> T2[Int, String], T2[Int, String], T2[Int, String]
>>> ] {
>>> val log = LoggerFactory.getLogger(classOf[Join])
>>> override def coGroup(left: java.lang.Iterable[T2[Int, String]],
>>> right: java.lang.Iterable[T2[Int, String]],
>>> out: Collector[T2[Int, String]]) {
>>> var seq = Seq.empty[T2[Int, String]]
>>> left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) }
>>> right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) }
>>> seq.foreach { e => out.collect(e) }
>>> }
>>>
>>> }
>>
>>
Free forum by Nabble | Edit this page |