Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

kant kodali

Hi All,

Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

say I set my time characteristic of stream execution environment to Ingestion time as follows

streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

do I need to call datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?

I thought datastream.assignTimestampsAndWatermarks is mandatory only if time characteristic is event time. No? Did this behavior change in Flink 1.10? because I see libraries not setting datastream.assignTimestampsAndWatermarks when time characteristic is Ingestion time but they do for event time. If not, I am wondering how can I set AscendingTimestampExtractor in a distributed environment? is there anyway to add monotonically increasing long(AscendingTimestampExtractor) without any distributed locks?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

Arvid Heise-3
Hi Kant,

according to the documentation [1], you don't need to set a watermark assigner:

Compared to event time, ingestion time programs cannot handle any out-of-order events or late data, but the programs don’t have to specify how to generate watermarks.

Internally, ingestion time is treated much like event time, but with automatic timestamp assignment and automatic watermark generation.


So it's neither possible to assign timestamps nor watermark, but it seems as if the default behavior is exactly as you want it to be. If that doesn't work for you, could you please rephrase your last question or describe your use case? I didn't get it.


On Tue, Mar 10, 2020 at 5:01 AM kant kodali <[hidden email]> wrote:

Hi All,

Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

say I set my time characteristic of stream execution environment to Ingestion time as follows

streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

do I need to call datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?

I thought datastream.assignTimestampsAndWatermarks is mandatory only if time characteristic is event time. No? Did this behavior change in Flink 1.10? because I see libraries not setting datastream.assignTimestampsAndWatermarks when time characteristic is Ingestion time but they do for event time. If not, I am wondering how can I set AscendingTimestampExtractor in a distributed environment? is there anyway to add monotonically increasing long(AscendingTimestampExtractor) without any distributed locks?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

Arvid Heise-3
Hi Kant,

I just saw that asked the same question on SO [1]. Could you, in the future, please cross-reference these posts, so that we don't waste resources on answering?


On Tue, Mar 10, 2020 at 9:33 AM Arvid Heise <[hidden email]> wrote:
Hi Kant,

according to the documentation [1], you don't need to set a watermark assigner:

Compared to event time, ingestion time programs cannot handle any out-of-order events or late data, but the programs don’t have to specify how to generate watermarks.

Internally, ingestion time is treated much like event time, but with automatic timestamp assignment and automatic watermark generation.


So it's neither possible to assign timestamps nor watermark, but it seems as if the default behavior is exactly as you want it to be. If that doesn't work for you, could you please rephrase your last question or describe your use case? I didn't get it.


On Tue, Mar 10, 2020 at 5:01 AM kant kodali <[hidden email]> wrote:

Hi All,

Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

say I set my time characteristic of stream execution environment to Ingestion time as follows

streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

do I need to call datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?

I thought datastream.assignTimestampsAndWatermarks is mandatory only if time characteristic is event time. No? Did this behavior change in Flink 1.10? because I see libraries not setting datastream.assignTimestampsAndWatermarks when time characteristic is Ingestion time but they do for event time. If not, I am wondering how can I set AscendingTimestampExtractor in a distributed environment? is there anyway to add monotonically increasing long(AscendingTimestampExtractor) without any distributed locks?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

kant kodali
Hi Arvid,

If ingestion time programs cannot handle late data then why would it generate watermarks? Isn't the whole point of watermarks is to handle the late data?

My last question was more about this library I run several algorithms using SimpleEdgeStream.aggregrate(<algoirthm>).print() and I am running into the following error whenever I invoke the following constructor . But it works if I change it to this so I am not exactly sure what is happening there. 

 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0c61d6ef0483c3068076a988bc252a74)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0c61d6ef0483c3068076a988bc252a74)

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at Test.main(Test.java:86)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 8 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0c61d6ef0483c3068076a988bc252a74)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)

at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)

at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)

at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)

at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 19 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)

at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)

at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)


On Tue, Mar 10, 2020 at 1:40 AM Arvid Heise <[hidden email]> wrote:
Hi Kant,

I just saw that asked the same question on SO [1]. Could you, in the future, please cross-reference these posts, so that we don't waste resources on answering?


On Tue, Mar 10, 2020 at 9:33 AM Arvid Heise <[hidden email]> wrote:
Hi Kant,

according to the documentation [1], you don't need to set a watermark assigner:

Compared to event time, ingestion time programs cannot handle any out-of-order events or late data, but the programs don’t have to specify how to generate watermarks.

Internally, ingestion time is treated much like event time, but with automatic timestamp assignment and automatic watermark generation.


So it's neither possible to assign timestamps nor watermark, but it seems as if the default behavior is exactly as you want it to be. If that doesn't work for you, could you please rephrase your last question or describe your use case? I didn't get it.


On Tue, Mar 10, 2020 at 5:01 AM kant kodali <[hidden email]> wrote:

Hi All,

Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

say I set my time characteristic of stream execution environment to Ingestion time as follows

streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

do I need to call datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?

I thought datastream.assignTimestampsAndWatermarks is mandatory only if time characteristic is event time. No? Did this behavior change in Flink 1.10? because I see libraries not setting datastream.assignTimestampsAndWatermarks when time characteristic is Ingestion time but they do for event time. If not, I am wondering how can I set AscendingTimestampExtractor in a distributed environment? is there anyway to add monotonically increasing long(AscendingTimestampExtractor) without any distributed locks?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

Aljoscha Krettek
On 10.03.20 10:13, kant kodali wrote:

> If ingestion time programs cannot handle late data then why would it
> generate watermarks? Isn't the whole point of watermarks is to handle the
> late data?

Watermarks are not only used for handling late data. Watermarks are the
mechanism that is used to update time throughout the streaming topology,
starting from the sources. Among other things is is used to detect late
data.

When setting the characteristic to "ingestion time" you are essentially
instating a watermark extractor that extracts the current processing
time at the sources as event time.

> My last question was more about this library
> <https://github.com/vasia/gelly-streaming> I run several algorithms using
> SimpleEdgeStream.aggregrate(<algoirthm>).print() and I am running into the
> following error whenever I invoke the following constructor
> <https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L69>
> .
> But it works if I change it to this
> <https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L86>
> so
> I am not exactly sure what is happening there.

I don't know what is going on here, could it be that the library
internally sets the characteristic to event-time, thereby overriding
your ingestion-time setting? In that case you would indeed be missing a
watermark extractor. I'm cc'ing Vasia, as the author of that library.

-Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

David Anderson-2
In reply to this post by kant kodali
Watermarks are a tool for handling out-of-orderness when working with event time timestamps. They provide a mechanism for managing the tradeoff between latency and completeness, allowing you to manage how long to wait for any out-of-orderness to resolve itself. Note the way that Flink uses these terms, out-of-orderness is not the same as lateness: your watermarking will accommodate a certain amount of out-of-orderness, and out-of-order events that arrive within this timeframe are not considered late. Only events that are excessively out-of-order -- i.e., with timestamps behind the current watermark -- are late.

I would say that the documentation you quoted is a bit misleading, since with ingestion time processing there can be no late events.

Most of the Flink runtime only makes a distinction between processing time and event time. For example, there are processing time timers (triggered by the system clock) and event time timers (triggered by watermarks), but there's no such thing as an ingestion time timer. Ingestion time is a hybrid between the two that assigns timestamps and watermarks based on processing time, and then the rest of the pipeline behaves as though you were doing event time processing.

This means that when working with ingestion time you lose most of the benefits of event time processing, such as deterministic, reproducible behavior. But using ingestion time does make it possible to use certain parts of the APIs that are described as "event time only", such as interval joins.

I don't know enough about streaming-gelly to speculate about what's going on there.

David



On Tue, Mar 10, 2020 at 10:14 AM kant kodali <[hidden email]> wrote:
Hi Arvid,

If ingestion time programs cannot handle late data then why would it generate watermarks? Isn't the whole point of watermarks is to handle the late data?

My last question was more about this library I run several algorithms using SimpleEdgeStream.aggregrate(<algoirthm>).print() and I am running into the following error whenever I invoke the following constructor . But it works if I change it to this so I am not exactly sure what is happening there. 

 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0c61d6ef0483c3068076a988bc252a74)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0c61d6ef0483c3068076a988bc252a74)

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at Test.main(Test.java:86)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 8 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0c61d6ef0483c3068076a988bc252a74)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)

at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)

at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)

at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)

at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 19 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)

at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)

at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)


On Tue, Mar 10, 2020 at 1:40 AM Arvid Heise <[hidden email]> wrote:
Hi Kant,

I just saw that asked the same question on SO [1]. Could you, in the future, please cross-reference these posts, so that we don't waste resources on answering?


On Tue, Mar 10, 2020 at 9:33 AM Arvid Heise <[hidden email]> wrote:
Hi Kant,

according to the documentation [1], you don't need to set a watermark assigner:

Compared to event time, ingestion time programs cannot handle any out-of-order events or late data, but the programs don’t have to specify how to generate watermarks.

Internally, ingestion time is treated much like event time, but with automatic timestamp assignment and automatic watermark generation.


So it's neither possible to assign timestamps nor watermark, but it seems as if the default behavior is exactly as you want it to be. If that doesn't work for you, could you please rephrase your last question or describe your use case? I didn't get it.


On Tue, Mar 10, 2020 at 5:01 AM kant kodali <[hidden email]> wrote:

Hi All,

Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

say I set my time characteristic of stream execution environment to Ingestion time as follows

streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

do I need to call datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?

I thought datastream.assignTimestampsAndWatermarks is mandatory only if time characteristic is event time. No? Did this behavior change in Flink 1.10? because I see libraries not setting datastream.assignTimestampsAndWatermarks when time characteristic is Ingestion time but they do for event time. If not, I am wondering how can I set AscendingTimestampExtractor in a distributed environment? is there anyway to add monotonically increasing long(AscendingTimestampExtractor) without any distributed locks?

Thanks!