DeserializationSchema isEndOfStream usage?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

DeserializationSchema isEndOfStream usage?

David Kim
Hello all,

I saw that DeserializationSchema has an API "isEndOfStream()". 


Can isEndOfStream be utilized to somehow terminate a streaming flink job?

I was under the impression that if we return "true" we can control when a stream can close. The use case I had in mind was controlling when unit/integration tests would terminate a flink job. We can rely on the fact that a test/spec would know how many items it expects to consume and then switch isEndOfStream to return true.

Am I misunderstanding the intention for isEndOfStream

I also set a breakpoint on isEndOfStream and saw that it never was hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema implementation.

Currently testing on 1.0-SNAPSHOT.

Cheers!
David
Reply | Threaded
Open this post in threaded view
|

Re: DeserializationSchema isEndOfStream usage?

Robert Metzger
Hi David,

In theory isEndOfStream() is absolutely the right way to go for stopping data sources in Flink.
That its not working as expected is a bug. I have a pending pull request for adding a Kafka 0.9 connector, which fixes this issue as well (for all supported Kafka versions).

Sorry for the inconvenience. If you want, you can check out the branch of the PR and build Flink yourself to get the fix.
I hope that I can merge the connector to master this week, then, the fix will be available in 1.0-SNAPSHOT as well.

Regards,
Robert



Sent from my iPhone

On 11.01.2016, at 21:39, David Kim <[hidden email]> wrote:

Hello all,

I saw that DeserializationSchema has an API "isEndOfStream()". 


Can isEndOfStream be utilized to somehow terminate a streaming flink job?

I was under the impression that if we return "true" we can control when a stream can close. The use case I had in mind was controlling when unit/integration tests would terminate a flink job. We can rely on the fact that a test/spec would know how many items it expects to consume and then switch isEndOfStream to return true.

Am I misunderstanding the intention for isEndOfStream

I also set a breakpoint on isEndOfStream and saw that it never was hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema implementation.

Currently testing on 1.0-SNAPSHOT.

Cheers!
David
Reply | Threaded
Open this post in threaded view
|

Re: DeserializationSchema isEndOfStream usage?

David Kim
Thanks Robert! I'll be keeping tabs on the PR.

Cheers,
David

On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <[hidden email]> wrote:
Hi David,

In theory isEndOfStream() is absolutely the right way to go for stopping data sources in Flink.
That its not working as expected is a bug. I have a pending pull request for adding a Kafka 0.9 connector, which fixes this issue as well (for all supported Kafka versions).

Sorry for the inconvenience. If you want, you can check out the branch of the PR and build Flink yourself to get the fix.
I hope that I can merge the connector to master this week, then, the fix will be available in 1.0-SNAPSHOT as well.

Regards,
Robert



Sent from my iPhone

On 11.01.2016, at 21:39, David Kim <[hidden email]> wrote:

Hello all,

I saw that DeserializationSchema has an API "isEndOfStream()". 


Can isEndOfStream be utilized to somehow terminate a streaming flink job?

I was under the impression that if we return "true" we can control when a stream can close. The use case I had in mind was controlling when unit/integration tests would terminate a flink job. We can rely on the fact that a test/spec would know how many items it expects to consume and then switch isEndOfStream to return true.

Am I misunderstanding the intention for isEndOfStream

I also set a breakpoint on isEndOfStream and saw that it never was hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema implementation.

Currently testing on 1.0-SNAPSHOT.

Cheers!
David



--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.
Reply | Threaded
Open this post in threaded view
|

Re: DeserializationSchema isEndOfStream usage?

rmetzger0
I've now merged the pull request. DeserializationSchema.isEndOfStream() should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors.

Please let me know if the updated code has any issues. I'll fix the issues asap.

On Wed, Jan 13, 2016 at 5:06 PM, David Kim <[hidden email]> wrote:
Thanks Robert! I'll be keeping tabs on the PR.

Cheers,
David

On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <[hidden email]> wrote:
Hi David,

In theory isEndOfStream() is absolutely the right way to go for stopping data sources in Flink.
That its not working as expected is a bug. I have a pending pull request for adding a Kafka 0.9 connector, which fixes this issue as well (for all supported Kafka versions).

Sorry for the inconvenience. If you want, you can check out the branch of the PR and build Flink yourself to get the fix.
I hope that I can merge the connector to master this week, then, the fix will be available in 1.0-SNAPSHOT as well.

Regards,
Robert



Sent from my iPhone

On 11.01.2016, at 21:39, David Kim <[hidden email]> wrote:

Hello all,

I saw that DeserializationSchema has an API "isEndOfStream()". 


Can isEndOfStream be utilized to somehow terminate a streaming flink job?

I was under the impression that if we return "true" we can control when a stream can close. The use case I had in mind was controlling when unit/integration tests would terminate a flink job. We can rely on the fact that a test/spec would know how many items it expects to consume and then switch isEndOfStream to return true.

Am I misunderstanding the intention for isEndOfStream

I also set a breakpoint on isEndOfStream and saw that it never was hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema implementation.

Currently testing on 1.0-SNAPSHOT.

Cheers!
David



--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.

Reply | Threaded
Open this post in threaded view
|

Re: DeserializationSchema isEndOfStream usage?

David Kim
Hi Robert!

Thanks for reaching out. I ran into an issue and wasn't sure if this was due to a misconfiguration on my end of if this is a real bug. I have one DataStream and I'm sinking to two different kafka sinks. When the job starts, I run into this error:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.UnsupportedOperationException: The accumulator 'producer-record-retry-rate' already exists and cannot be added.
at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:204)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:745)


The particular accumulator the exception is complains about changes, meaning it's not always 'producer-record-retry-rate' -- most likely due to the non-deterministic ordering of the collection. Any guidance appreciated!

I'm using 1.0-SNAPSHOT and my two sinks are FlinkKafkaProducer08.

The flink code looks something like this:


val stream: DataStream[Foo] = ...
val kafkaA = new FlinkKafkaProducer08[Foo]...
val kafkaB = new FlinkKafkaProducer08[Foo]...

stream
  .addSink(kafkaA)

stream.
  .addSink(kafkaB)

Thanks,
David

On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger <[hidden email]> wrote:
I've now merged the pull request. DeserializationSchema.isEndOfStream() should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors.

Please let me know if the updated code has any issues. I'll fix the issues asap.

On Wed, Jan 13, 2016 at 5:06 PM, David Kim <[hidden email]> wrote:
Thanks Robert! I'll be keeping tabs on the PR.

Cheers,
David

On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <[hidden email]> wrote:
Hi David,

In theory isEndOfStream() is absolutely the right way to go for stopping data sources in Flink.
That its not working as expected is a bug. I have a pending pull request for adding a Kafka 0.9 connector, which fixes this issue as well (for all supported Kafka versions).

Sorry for the inconvenience. If you want, you can check out the branch of the PR and build Flink yourself to get the fix.
I hope that I can merge the connector to master this week, then, the fix will be available in 1.0-SNAPSHOT as well.

Regards,
Robert



Sent from my iPhone

On 11.01.2016, at 21:39, David Kim <[hidden email]> wrote:

Hello all,

I saw that DeserializationSchema has an API "isEndOfStream()". 


Can isEndOfStream be utilized to somehow terminate a streaming flink job?

I was under the impression that if we return "true" we can control when a stream can close. The use case I had in mind was controlling when unit/integration tests would terminate a flink job. We can rely on the fact that a test/spec would know how many items it expects to consume and then switch isEndOfStream to return true.

Am I misunderstanding the intention for isEndOfStream

I also set a breakpoint on isEndOfStream and saw that it never was hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema implementation.

Currently testing on 1.0-SNAPSHOT.

Cheers!
David



--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.




--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.
Reply | Threaded
Open this post in threaded view
|

Re: DeserializationSchema isEndOfStream usage?

rmetzger0
Hi David,

thank you for reporting the issue. I'll look into it. In the meantime, you can set "flink.disable-metrics" to "true" in the properties. This way, you disable the metrics.
I'll probably have to introduce something like a client id to differentiate between the producers.

Robert

On Thu, Jan 21, 2016 at 11:51 PM, David Kim <[hidden email]> wrote:
Hi Robert!

Thanks for reaching out. I ran into an issue and wasn't sure if this was due to a misconfiguration on my end of if this is a real bug. I have one DataStream and I'm sinking to two different kafka sinks. When the job starts, I run into this error:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.UnsupportedOperationException: The accumulator 'producer-record-retry-rate' already exists and cannot be added.
at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:204)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:745)


The particular accumulator the exception is complains about changes, meaning it's not always 'producer-record-retry-rate' -- most likely due to the non-deterministic ordering of the collection. Any guidance appreciated!

I'm using 1.0-SNAPSHOT and my two sinks are FlinkKafkaProducer08.

The flink code looks something like this:


val stream: DataStream[Foo] = ...
val kafkaA = new FlinkKafkaProducer08[Foo]...
val kafkaB = new FlinkKafkaProducer08[Foo]...

stream
  .addSink(kafkaA)

stream.
  .addSink(kafkaB)

Thanks,
David

On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger <[hidden email]> wrote:
I've now merged the pull request. DeserializationSchema.isEndOfStream() should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors.

Please let me know if the updated code has any issues. I'll fix the issues asap.

On Wed, Jan 13, 2016 at 5:06 PM, David Kim <[hidden email]> wrote:
Thanks Robert! I'll be keeping tabs on the PR.

Cheers,
David

On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <[hidden email]> wrote:
Hi David,

In theory isEndOfStream() is absolutely the right way to go for stopping data sources in Flink.
That its not working as expected is a bug. I have a pending pull request for adding a Kafka 0.9 connector, which fixes this issue as well (for all supported Kafka versions).

Sorry for the inconvenience. If you want, you can check out the branch of the PR and build Flink yourself to get the fix.
I hope that I can merge the connector to master this week, then, the fix will be available in 1.0-SNAPSHOT as well.

Regards,
Robert



Sent from my iPhone

On 11.01.2016, at 21:39, David Kim <[hidden email]> wrote:

Hello all,

I saw that DeserializationSchema has an API "isEndOfStream()". 


Can isEndOfStream be utilized to somehow terminate a streaming flink job?

I was under the impression that if we return "true" we can control when a stream can close. The use case I had in mind was controlling when unit/integration tests would terminate a flink job. We can rely on the fact that a test/spec would know how many items it expects to consume and then switch isEndOfStream to return true.

Am I misunderstanding the intention for isEndOfStream

I also set a breakpoint on isEndOfStream and saw that it never was hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema implementation.

Currently testing on 1.0-SNAPSHOT.

Cheers!
David



--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.




--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.

Reply | Threaded
Open this post in threaded view
|

Re: DeserializationSchema isEndOfStream usage?

David Kim
Hi Robert,

Thanks for the workaround. Unfortunately I think I found a bug in the code that controls the metrics logic.

Should Boolean.getBoolean be Boolean.valueOf instead?


Thanks!
David

On Fri, Jan 22, 2016 at 2:17 AM, Robert Metzger <[hidden email]> wrote:
Hi David,

thank you for reporting the issue. I'll look into it. In the meantime, you can set "flink.disable-metrics" to "true" in the properties. This way, you disable the metrics.
I'll probably have to introduce something like a client id to differentiate between the producers.

Robert

On Thu, Jan 21, 2016 at 11:51 PM, David Kim <[hidden email]> wrote:
Hi Robert!

Thanks for reaching out. I ran into an issue and wasn't sure if this was due to a misconfiguration on my end of if this is a real bug. I have one DataStream and I'm sinking to two different kafka sinks. When the job starts, I run into this error:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.UnsupportedOperationException: The accumulator 'producer-record-retry-rate' already exists and cannot be added.
at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:204)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:745)


The particular accumulator the exception is complains about changes, meaning it's not always 'producer-record-retry-rate' -- most likely due to the non-deterministic ordering of the collection. Any guidance appreciated!

I'm using 1.0-SNAPSHOT and my two sinks are FlinkKafkaProducer08.

The flink code looks something like this:


val stream: DataStream[Foo] = ...
val kafkaA = new FlinkKafkaProducer08[Foo]...
val kafkaB = new FlinkKafkaProducer08[Foo]...

stream
  .addSink(kafkaA)

stream.
  .addSink(kafkaB)

Thanks,
David

On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger <[hidden email]> wrote:
I've now merged the pull request. DeserializationSchema.isEndOfStream() should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors.

Please let me know if the updated code has any issues. I'll fix the issues asap.

On Wed, Jan 13, 2016 at 5:06 PM, David Kim <[hidden email]> wrote:
Thanks Robert! I'll be keeping tabs on the PR.

Cheers,
David

On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <[hidden email]> wrote:
Hi David,

In theory isEndOfStream() is absolutely the right way to go for stopping data sources in Flink.
That its not working as expected is a bug. I have a pending pull request for adding a Kafka 0.9 connector, which fixes this issue as well (for all supported Kafka versions).

Sorry for the inconvenience. If you want, you can check out the branch of the PR and build Flink yourself to get the fix.
I hope that I can merge the connector to master this week, then, the fix will be available in 1.0-SNAPSHOT as well.

Regards,
Robert



Sent from my iPhone

On 11.01.2016, at 21:39, David Kim <[hidden email]> wrote:

Hello all,

I saw that DeserializationSchema has an API "isEndOfStream()". 


Can isEndOfStream be utilized to somehow terminate a streaming flink job?

I was under the impression that if we return "true" we can control when a stream can close. The use case I had in mind was controlling when unit/integration tests would terminate a flink job. We can rely on the fact that a test/spec would know how many items it expects to consume and then switch isEndOfStream to return true.

Am I misunderstanding the intention for isEndOfStream

I also set a breakpoint on isEndOfStream and saw that it never was hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema implementation.

Currently testing on 1.0-SNAPSHOT.

Cheers!
David



--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.




--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.




--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.
Reply | Threaded
Open this post in threaded view
|

Re: DeserializationSchema isEndOfStream usage?

rmetzger0
Hi David,

you are right. I'll fix the issue in this pull request: https://github.com/apache/flink/pull/1541

I guess everything in your topology runs with a parallelism of 1? Running it with a parallelism higher than 1 will also work around the issue (because then the two Sinks are not executed in one Task).

On Fri, Jan 22, 2016 at 4:56 PM, David Kim <[hidden email]> wrote:
Hi Robert,

Thanks for the workaround. Unfortunately I think I found a bug in the code that controls the metrics logic.

Should Boolean.getBoolean be Boolean.valueOf instead?


Thanks!
David

On Fri, Jan 22, 2016 at 2:17 AM, Robert Metzger <[hidden email]> wrote:
Hi David,

thank you for reporting the issue. I'll look into it. In the meantime, you can set "flink.disable-metrics" to "true" in the properties. This way, you disable the metrics.
I'll probably have to introduce something like a client id to differentiate between the producers.

Robert

On Thu, Jan 21, 2016 at 11:51 PM, David Kim <[hidden email]> wrote:
Hi Robert!

Thanks for reaching out. I ran into an issue and wasn't sure if this was due to a misconfiguration on my end of if this is a real bug. I have one DataStream and I'm sinking to two different kafka sinks. When the job starts, I run into this error:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.UnsupportedOperationException: The accumulator 'producer-record-retry-rate' already exists and cannot be added.
at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:204)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:745)


The particular accumulator the exception is complains about changes, meaning it's not always 'producer-record-retry-rate' -- most likely due to the non-deterministic ordering of the collection. Any guidance appreciated!

I'm using 1.0-SNAPSHOT and my two sinks are FlinkKafkaProducer08.

The flink code looks something like this:


val stream: DataStream[Foo] = ...
val kafkaA = new FlinkKafkaProducer08[Foo]...
val kafkaB = new FlinkKafkaProducer08[Foo]...

stream
  .addSink(kafkaA)

stream.
  .addSink(kafkaB)

Thanks,
David

On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger <[hidden email]> wrote:
I've now merged the pull request. DeserializationSchema.isEndOfStream() should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors.

Please let me know if the updated code has any issues. I'll fix the issues asap.

On Wed, Jan 13, 2016 at 5:06 PM, David Kim <[hidden email]> wrote:
Thanks Robert! I'll be keeping tabs on the PR.

Cheers,
David

On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <[hidden email]> wrote:
Hi David,

In theory isEndOfStream() is absolutely the right way to go for stopping data sources in Flink.
That its not working as expected is a bug. I have a pending pull request for adding a Kafka 0.9 connector, which fixes this issue as well (for all supported Kafka versions).

Sorry for the inconvenience. If you want, you can check out the branch of the PR and build Flink yourself to get the fix.
I hope that I can merge the connector to master this week, then, the fix will be available in 1.0-SNAPSHOT as well.

Regards,
Robert



Sent from my iPhone

On 11.01.2016, at 21:39, David Kim <[hidden email]> wrote:

Hello all,

I saw that DeserializationSchema has an API "isEndOfStream()". 


Can isEndOfStream be utilized to somehow terminate a streaming flink job?

I was under the impression that if we return "true" we can control when a stream can close. The use case I had in mind was controlling when unit/integration tests would terminate a flink job. We can rely on the fact that a test/spec would know how many items it expects to consume and then switch isEndOfStream to return true.

Am I misunderstanding the intention for isEndOfStream

I also set a breakpoint on isEndOfStream and saw that it never was hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema implementation.

Currently testing on 1.0-SNAPSHOT.

Cheers!
David



--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.




--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.




--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.

Reply | Threaded
Open this post in threaded view
|

Re: DeserializationSchema isEndOfStream usage?

David Kim
Hi Robert,

Awesome, thanks for the fast turnaround!

Cheers,
David

On Fri, Jan 22, 2016 at 11:17 AM, Robert Metzger <[hidden email]> wrote:
Hi David,

you are right. I'll fix the issue in this pull request: https://github.com/apache/flink/pull/1541

I guess everything in your topology runs with a parallelism of 1? Running it with a parallelism higher than 1 will also work around the issue (because then the two Sinks are not executed in one Task).

On Fri, Jan 22, 2016 at 4:56 PM, David Kim <[hidden email]> wrote:
Hi Robert,

Thanks for the workaround. Unfortunately I think I found a bug in the code that controls the metrics logic.

Should Boolean.getBoolean be Boolean.valueOf instead?


Thanks!
David

On Fri, Jan 22, 2016 at 2:17 AM, Robert Metzger <[hidden email]> wrote:
Hi David,

thank you for reporting the issue. I'll look into it. In the meantime, you can set "flink.disable-metrics" to "true" in the properties. This way, you disable the metrics.
I'll probably have to introduce something like a client id to differentiate between the producers.

Robert

On Thu, Jan 21, 2016 at 11:51 PM, David Kim <[hidden email]> wrote:
Hi Robert!

Thanks for reaching out. I ran into an issue and wasn't sure if this was due to a misconfiguration on my end of if this is a real bug. I have one DataStream and I'm sinking to two different kafka sinks. When the job starts, I run into this error:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.UnsupportedOperationException: The accumulator 'producer-record-retry-rate' already exists and cannot be added.
at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:204)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:745)


The particular accumulator the exception is complains about changes, meaning it's not always 'producer-record-retry-rate' -- most likely due to the non-deterministic ordering of the collection. Any guidance appreciated!

I'm using 1.0-SNAPSHOT and my two sinks are FlinkKafkaProducer08.

The flink code looks something like this:


val stream: DataStream[Foo] = ...
val kafkaA = new FlinkKafkaProducer08[Foo]...
val kafkaB = new FlinkKafkaProducer08[Foo]...

stream
  .addSink(kafkaA)

stream.
  .addSink(kafkaB)

Thanks,
David

On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger <[hidden email]> wrote:
I've now merged the pull request. DeserializationSchema.isEndOfStream() should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors.

Please let me know if the updated code has any issues. I'll fix the issues asap.

On Wed, Jan 13, 2016 at 5:06 PM, David Kim <[hidden email]> wrote:
Thanks Robert! I'll be keeping tabs on the PR.

Cheers,
David

On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <[hidden email]> wrote:
Hi David,

In theory isEndOfStream() is absolutely the right way to go for stopping data sources in Flink.
That its not working as expected is a bug. I have a pending pull request for adding a Kafka 0.9 connector, which fixes this issue as well (for all supported Kafka versions).

Sorry for the inconvenience. If you want, you can check out the branch of the PR and build Flink yourself to get the fix.
I hope that I can merge the connector to master this week, then, the fix will be available in 1.0-SNAPSHOT as well.

Regards,
Robert



Sent from my iPhone

On 11.01.2016, at 21:39, David Kim <[hidden email]> wrote:

Hello all,

I saw that DeserializationSchema has an API "isEndOfStream()". 


Can isEndOfStream be utilized to somehow terminate a streaming flink job?

I was under the impression that if we return "true" we can control when a stream can close. The use case I had in mind was controlling when unit/integration tests would terminate a flink job. We can rely on the fact that a test/spec would know how many items it expects to consume and then switch isEndOfStream to return true.

Am I misunderstanding the intention for isEndOfStream

I also set a breakpoint on isEndOfStream and saw that it never was hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema implementation.

Currently testing on 1.0-SNAPSHOT.

Cheers!
David



--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.




--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.




--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.




--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.