IterativeStream seems to ignore maxWaitTimeMillis

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

IterativeStream seems to ignore maxWaitTimeMillis

Juan Rodríguez Hortalá
Hi,

I wrote a proof of concept for a Java version of mapWithState with time-based state eviction https://github.com/juanrh/flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e381e99c76c5/src/main/java/com/github/juanrh/streaming/MapWithStateIterPoC.java. The idea is:

 - Convert an input KeyedStream with key K and value V into a KeyedStream of Either<V, K>, with the original values as Left.
 - Replace a ValueState<S> by a ValueState for a POJO that besides S it stores the timestamp of the last time that state was accessed.
 - Define a IterativeStream from the Either stream, and apply a transformation function that periorically sends "tombstone" events as Right events in the closeWith of the IterativeStream. When a tombstone is received, delete the state with clear if it the time since it was last accessed is bigger than a configured time to live.

This seems to work so far, but there are some things that look weird to me:

 - The program never seems to stop, event though I Ihave defined the IterativeStream with https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#iterate-long- . The value of seems to be ignored. I'm using a custom source function, but it seems like the method SourceFunction.cancel() it's not being called.

 - I'm getting several messages "WARN MetricGroup: Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported. (null)". What does that mean?

Thanks,

Juan
Reply | Threaded
Open this post in threaded view
|

Re: IterativeStream seems to ignore maxWaitTimeMillis

Aljoscha Krettek
Might it be that your initial source never stops? A loop will only terminate if both the original source stops and the loop timeout is reached.

On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá <[hidden email]> wrote:
Hi,

I wrote a proof of concept for a Java version of mapWithState with time-based state eviction https://github.com/juanrh/flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e381e99c76c5/src/main/java/com/github/juanrh/streaming/MapWithStateIterPoC.java. The idea is:

 - Convert an input KeyedStream with key K and value V into a KeyedStream of Either<V, K>, with the original values as Left.
 - Replace a ValueState<S> by a ValueState for a POJO that besides S it stores the timestamp of the last time that state was accessed.
 - Define a IterativeStream from the Either stream, and apply a transformation function that periorically sends "tombstone" events as Right events in the closeWith of the IterativeStream. When a tombstone is received, delete the state with clear if it the time since it was last accessed is bigger than a configured time to live.

This seems to work so far, but there are some things that look weird to me:

 - The program never seems to stop, event though I Ihave defined the IterativeStream with https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#iterate-long- . The value of seems to be ignored. I'm using a custom source function, but it seems like the method SourceFunction.cancel() it's not being called.

 - I'm getting several messages "WARN MetricGroup: Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported. (null)". What does that mean?

Thanks,

Juan
Reply | Threaded
Open this post in threaded view
|

Re: IterativeStream seems to ignore maxWaitTimeMillis

Juan Rodríguez Hortalá
Thanks for your answer Aljoscha,

The source stops, when I comment all the transformed streams and just print the input, the program completes. But this is custom SourceFunction, could this be related to this? Maybe I should implement emitWatermark? I'm using ingestion time so I assumed this wasn't needed.

Greetings,

Juan

On Mon, Nov 21, 2016 at 9:17 AM, Aljoscha Krettek <[hidden email]> wrote:
Might it be that your initial source never stops? A loop will only terminate if both the original source stops and the loop timeout is reached.

On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá <[hidden email]> wrote:
Hi,

I wrote a proof of concept for a Java version of mapWithState with time-based state eviction https://github.com/juanrh/flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e381e99c76c5/src/main/java/com/github/juanrh/streaming/MapWithStateIterPoC.java. The idea is:

 - Convert an input KeyedStream with key K and value V into a KeyedStream of Either<V, K>, with the original values as Left.
 - Replace a ValueState<S> by a ValueState for a POJO that besides S it stores the timestamp of the last time that state was accessed.
 - Define a IterativeStream from the Either stream, and apply a transformation function that periorically sends "tombstone" events as Right events in the closeWith of the IterativeStream. When a tombstone is received, delete the state with clear if it the time since it was last accessed is bigger than a configured time to live.

This seems to work so far, but there are some things that look weird to me:

 - The program never seems to stop, event though I Ihave defined the IterativeStream with https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#iterate-long- . The value of seems to be ignored. I'm using a custom source function, but it seems like the method SourceFunction.cancel() it's not being called.

 - I'm getting several messages "WARN MetricGroup: Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported. (null)". What does that mean?

Thanks,

Juan

Reply | Threaded
Open this post in threaded view
|

Re: IterativeStream seems to ignore maxWaitTimeMillis

Aljoscha Krettek
Ah, cancel() won't be called on the source if it is already stopped, I think. Could you try boiling it down to the very basics, i.e. have just the source and an iteration and check what happens.

On Wed, 23 Nov 2016 at 05:08 Juan Rodríguez Hortalá <[hidden email]> wrote:
Thanks for your answer Aljoscha,

The source stops, when I comment all the transformed streams and just print the input, the program completes. But this is custom SourceFunction, could this be related to this? Maybe I should implement emitWatermark? I'm using ingestion time so I assumed this wasn't needed.

Greetings,

Juan

On Mon, Nov 21, 2016 at 9:17 AM, Aljoscha Krettek <[hidden email]> wrote:
Might it be that your initial source never stops? A loop will only terminate if both the original source stops and the loop timeout is reached.

On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá <[hidden email]> wrote:
Hi,

I wrote a proof of concept for a Java version of mapWithState with time-based state eviction https://github.com/juanrh/flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e381e99c76c5/src/main/java/com/github/juanrh/streaming/MapWithStateIterPoC.java. The idea is:

 - Convert an input KeyedStream with key K and value V into a KeyedStream of Either<V, K>, with the original values as Left.
 - Replace a ValueState<S> by a ValueState for a POJO that besides S it stores the timestamp of the last time that state was accessed.
 - Define a IterativeStream from the Either stream, and apply a transformation function that periorically sends "tombstone" events as Right events in the closeWith of the IterativeStream. When a tombstone is received, delete the state with clear if it the time since it was last accessed is bigger than a configured time to live.

This seems to work so far, but there are some things that look weird to me:

 - The program never seems to stop, event though I Ihave defined the IterativeStream with https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#iterate-long- . The value of seems to be ignored. I'm using a custom source function, but it seems like the method SourceFunction.cancel() it's not being called.

 - I'm getting several messages "WARN MetricGroup: Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported. (null)". What does that mean?

Thanks,

Juan

Reply | Threaded
Open this post in threaded view
|

Re: IterativeStream seems to ignore maxWaitTimeMillis

Juan Rodríguez Hortalá
Thanks a lot for your suggestion Aljoscha, it has helped me discovered the problem: I was using an Executor inside a RichFunction and I wasn't shutting down the executor. Now I call executor.shutdownNow() in RichFunction .close(), and the job stops when both the input and the loop are exhausted.

Greetings,

Juan 



On Wed, Nov 23, 2016 at 2:19 AM, Aljoscha Krettek <[hidden email]> wrote:
Ah, cancel() won't be called on the source if it is already stopped, I think. Could you try boiling it down to the very basics, i.e. have just the source and an iteration and check what happens.

On Wed, 23 Nov 2016 at 05:08 Juan Rodríguez Hortalá <[hidden email]> wrote:
Thanks for your answer Aljoscha,

The source stops, when I comment all the transformed streams and just print the input, the program completes. But this is custom SourceFunction, could this be related to this? Maybe I should implement emitWatermark? I'm using ingestion time so I assumed this wasn't needed.

Greetings,

Juan

On Mon, Nov 21, 2016 at 9:17 AM, Aljoscha Krettek <[hidden email]> wrote:
Might it be that your initial source never stops? A loop will only terminate if both the original source stops and the loop timeout is reached.

On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá <[hidden email]> wrote:
Hi,

I wrote a proof of concept for a Java version of mapWithState with time-based state eviction https://github.com/juanrh/flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e381e99c76c5/src/main/java/com/github/juanrh/streaming/MapWithStateIterPoC.java. The idea is:

 - Convert an input KeyedStream with key K and value V into a KeyedStream of Either<V, K>, with the original values as Left.
 - Replace a ValueState<S> by a ValueState for a POJO that besides S it stores the timestamp of the last time that state was accessed.
 - Define a IterativeStream from the Either stream, and apply a transformation function that periorically sends "tombstone" events as Right events in the closeWith of the IterativeStream. When a tombstone is received, delete the state with clear if it the time since it was last accessed is bigger than a configured time to live.

This seems to work so far, but there are some things that look weird to me:

 - The program never seems to stop, event though I Ihave defined the IterativeStream with https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#iterate-long- . The value of seems to be ignored. I'm using a custom source function, but it seems like the method SourceFunction.cancel() it's not being called.

 - I'm getting several messages "WARN MetricGroup: Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported. (null)". What does that mean?

Thanks,

Juan