AsyncCollector Does not release the thread (1.2.1)

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

AsyncCollector Does not release the thread (1.2.1)

Steve Robert
Hi guys, 

AsyncCollector.collect(Throwable) method  seem to  not release  the Thread.
This scenario may be problematic when calling an external API
In the case of a timeout error there is no data to collect.

for example :

  CompletableFuture.supplyAsync(() -> asyncCallTask(input))
.thenAccept((Collection<Tuple3<String, streamDTO, Integer>> result) -> {

this.tupleEmited.getAndIncrement();

asyncCollector.collect(result);
})
.exceptionally((ex) -> {
asyncCollector.collect(ex);
return null;
});
}
it is possible to create an empty Collection and collect this empty collection to force the Thread to be released but this workflow seems strange to me.
thank for your help
 

--
Steve Robert
Software Engineer
T
Qualys, Inc. – Continuous Security
Reply | Threaded
Open this post in threaded view
|

Re: AsyncCollector Does not release the thread (1.2.1)

Aljoscha Krettek
Hi,

As far as I know calling collect(Throwable) should also finish the promise that would otherwise fulfilled by successfully collecting a result. If not then you might have found a bug. What makes you think that the Thread is not being released? Is your queue being filled up and no more elements are being processed?

Regarding your other question, yes, you can collect an empty Collection for signalling that there was no result.

Best,
Aljoscha

On 8. May 2017, at 21:47, Steve Robert <[hidden email]> wrote:

Hi guys, 

AsyncCollector.collect(Throwable) method  seem to  not release  the Thread.
This scenario may be problematic when calling an external API
In the case of a timeout error there is no data to collect.

for example :

  CompletableFuture.supplyAsync(() -> asyncCallTask(input))
.thenAccept((Collection<Tuple3<String, streamDTO, Integer>> result) -> {

this.tupleEmited.getAndIncrement();

asyncCollector.collect(result);
})
.exceptionally((ex) -> {
asyncCollector.collect(ex);
return null;
});
}
it is possible to create an empty Collection and collect this empty collection to force the Thread to be released but this workflow seems strange to me.
thank for your help
 

--
Steve Robert
Software Engineer
T
Qualys, Inc. – Continuous Security

Reply | Threaded
Open this post in threaded view
|

Re: AsyncCollector Does not release the thread (1.2.1)

Steve Robert
Hi Aljoscha ,

thank you for your reply,
 yes the queue being filled up and no more elements are being processed.(In relation to the limit defined at the "orderedWait" function call).
To add additional information, if I run the test on a local cluster I can see that the job never ends because the AsyncFunction stay blocked As if there was no call to  the "collect" method
Best,
Steve

On Tue, Jun 6, 2017 at 4:56 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

As far as I know calling collect(Throwable) should also finish the promise that would otherwise fulfilled by successfully collecting a result. If not then you might have found a bug. What makes you think that the Thread is not being released? Is your queue being filled up and no more elements are being processed?

Regarding your other question, yes, you can collect an empty Collection for signalling that there was no result.

Best,
Aljoscha

On 8. May 2017, at 21:47, Steve Robert <[hidden email]> wrote:

Hi guys, 

AsyncCollector.collect(Throwable) method  seem to  not release  the Thread.
This scenario may be problematic when calling an external API
In the case of a timeout error there is no data to collect.

for example :

  CompletableFuture.supplyAsync(() -> asyncCallTask(input))
.thenAccept((Collection<Tuple3<String, streamDTO, Integer>> result) -> {

this.tupleEmited.getAndIncrement();

asyncCollector.collect(result);
})
.exceptionally((ex) -> {
asyncCollector.collect(ex);
return null;
});
}
it is possible to create an empty Collection and collect this empty collection to force the Thread to be released but this workflow seems strange to me.
thank for your help
 

--
Steve Robert
Software Engineer
T
Qualys, Inc. – Continuous Security




--
Steve Robert
Software Engineer
T
Qualys, Inc. – Continuous Security
Reply | Threaded
Open this post in threaded view
|

Re: AsyncCollector Does not release the thread (1.2.1)

Aljoscha Krettek
Ok, thanks for letting us know. I’ll investigate.
On 6. Jun 2017, at 19:28, Steve Robert <[hidden email]> wrote:

Hi Aljoscha ,

thank you for your reply,
 yes the queue being filled up and no more elements are being processed.(In relation to the limit defined at the "orderedWait" function call).
To add additional information, if I run the test on a local cluster I can see that the job never ends because the AsyncFunction stay blocked As if there was no call to  the "collect" method
Best,
Steve

On Tue, Jun 6, 2017 at 4:56 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

As far as I know calling collect(Throwable) should also finish the promise that would otherwise fulfilled by successfully collecting a result. If not then you might have found a bug. What makes you think that the Thread is not being released? Is your queue being filled up and no more elements are being processed?

Regarding your other question, yes, you can collect an empty Collection for signalling that there was no result.

Best,
Aljoscha

On 8. May 2017, at 21:47, Steve Robert <[hidden email]> wrote:

Hi guys, 

AsyncCollector.collect(Throwable) method  seem to  not release  the Thread.
This scenario may be problematic when calling an external API
In the case of a timeout error there is no data to collect.

for example :

  CompletableFuture.supplyAsync(() -> asyncCallTask(input))
.thenAccept((Collection<Tuple3<String, streamDTO, Integer>> result) -> {

this.tupleEmited.getAndIncrement();

asyncCollector.collect(result);
})
.exceptionally((ex) -> {
asyncCollector.collect(ex);
return null;
});
}
it is possible to create an empty Collection and collect this empty collection to force the Thread to be released but this workflow seems strange to me.
thank for your help
 

--
Steve Robert
Software Engineer
T
Qualys, Inc. – Continuous Security




--
Steve Robert
Software Engineer
T
Qualys, Inc. – Continuous Security

Reply | Threaded
Open this post in threaded view
|

Re: AsyncCollector Does not release the thread (1.2.1)

Aljoscha Krettek
Hi Steve,

I’m assuming you are using Flink 1.2.x? If yes, then I’m afraid you re-discovered this issue: https://issues.apache.org/jira/browse/FLINK-6435. It was fixed in Flink 1.3.0. Is it possible for you to update to that version or do you think it’s important that we back port that fix to the Flink 1.2.x line?

Best,
Aljoscha

On 6. Jun 2017, at 19:34, Aljoscha Krettek <[hidden email]> wrote:

Ok, thanks for letting us know. I’ll investigate.
On 6. Jun 2017, at 19:28, Steve Robert <[hidden email]> wrote:

Hi Aljoscha ,

thank you for your reply,
 yes the queue being filled up and no more elements are being processed.(In relation to the limit defined at the "orderedWait" function call).
To add additional information, if I run the test on a local cluster I can see that the job never ends because the AsyncFunction stay blocked As if there was no call to  the "collect" method
Best,
Steve

On Tue, Jun 6, 2017 at 4:56 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

As far as I know calling collect(Throwable) should also finish the promise that would otherwise fulfilled by successfully collecting a result. If not then you might have found a bug. What makes you think that the Thread is not being released? Is your queue being filled up and no more elements are being processed?

Regarding your other question, yes, you can collect an empty Collection for signalling that there was no result.

Best,
Aljoscha

On 8. May 2017, at 21:47, Steve Robert <[hidden email]> wrote:

Hi guys, 

AsyncCollector.collect(Throwable) method  seem to  not release  the Thread.
This scenario may be problematic when calling an external API
In the case of a timeout error there is no data to collect.

for example :

  CompletableFuture.supplyAsync(() -> asyncCallTask(input))
.thenAccept((Collection<Tuple3<String, streamDTO, Integer>> result) -> {

this.tupleEmited.getAndIncrement();

asyncCollector.collect(result);
})
.exceptionally((ex) -> {
asyncCollector.collect(ex);
return null;
});
}
it is possible to create an empty Collection and collect this empty collection to force the Thread to be released but this workflow seems strange to me.
thank for your help
 

--
Steve Robert
Software Engineer
T
Qualys, Inc. – Continuous Security




--
Steve Robert
Software Engineer
T
Qualys, Inc. – Continuous Security