Non blocking operation in Apache flink

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Non blocking operation in Apache flink

Maatary Okouya

I'm looking for a way to avoid thread starvation in my tasks, by returning future but i don't see how is that possible.

Hence i would like to know, how flink handle the case where in your job you have to perform network calls (I use akka http or spray) or any IO operation and use the result of it.

In sparks i see asynchronous action and so on. I don't see any equivalent in apache flink. How does it works ? is it supported, or the network call and any io operation have to be synchronous ?

any help, indication, reads and so on would be appreciated

Reply | Threaded
Open this post in threaded view
|

Re: Non blocking operation in Apache flink

Aljoscha Krettek
Hi,
there is no functionality to have asynchronous calls in user functions in Flink. 

The asynchronous action feature in Spark is also not meant for such things, it is targeted at programs that need to pull all data to the application master. In Flink this is not necessary because you can specify a whole plan of operations before executing them. 

Cheers,
Aljoscha

On Tue, 24 May 2016 at 20:43 Maatary Okouya <[hidden email]> wrote:

I'm looking for a way to avoid thread starvation in my tasks, by returning future but i don't see how is that possible.

Hence i would like to know, how flink handle the case where in your job you have to perform network calls (I use akka http or spray) or any IO operation and use the result of it.

In sparks i see asynchronous action and so on. I don't see any equivalent in apache flink. How does it works ? is it supported, or the network call and any io operation have to be synchronous ?

any help, indication, reads and so on would be appreciated

Reply | Threaded
Open this post in threaded view
|

Re: Non blocking operation in Apache flink

Maatary Okouya
Thank you for your answer.

Maybe I should have mentioned that I am at the beginning with both framework, somewhat making a choice by evaluating their capability. I know Akka stream better.

So my question would be simple. Let say that

1-/ have a stream of event that are simply information about the fact that some item have changed somewhere in a database.

2-/ I need for each of those event, to query the db to get the new version of the item

3-/ apply some transformation


4-/connect to another Db and write that results.


My question here is as follow:

How am I suppose to make the call to both db in and out. Should those calls be synchronous ?


I come from scala and Akka, where typically we avoid to make blocking calls and use future all the ways for this kind of situation. Akka stream allows that fine grain level of detail for stream processing for instance. This avoid thread starvation. While I make the io operation the thread can be used for something else.



So I believe that somehow this can be reproduce with both frameworks.

Can you please explain how this is supposed to be handled in Flink ?


On Wed, May 25, 2016 at 5:17 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
there is no functionality to have asynchronous calls in user functions in Flink. 

The asynchronous action feature in Spark is also not meant for such things, it is targeted at programs that need to pull all data to the application master. In Flink this is not necessary because you can specify a whole plan of operations before executing them. 

Cheers,
Aljoscha

On Tue, 24 May 2016 at 20:43 Maatary Okouya <[hidden email]> wrote:

I'm looking for a way to avoid thread starvation in my tasks, by returning future but i don't see how is that possible.

Hence i would like to know, how flink handle the case where in your job you have to perform network calls (I use akka http or spray) or any IO operation and use the result of it.

In sparks i see asynchronous action and so on. I don't see any equivalent in apache flink. How does it works ? is it supported, or the network call and any io operation have to be synchronous ?

any help, indication, reads and so on would be appreciated

Reply | Threaded
Open this post in threaded view
|

Re: Non blocking operation in Apache flink

Maatary Okouya
In reply to this post by Aljoscha Krettek

On Wed, May 25, 2016 at 5:16 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
there is no functionality to have asynchronous calls in user functions in Flink. 

The asynchronous action feature in Spark is also not meant for such things, it is targeted at programs that need to pull all data to the application master. In Flink this is not necessary because you can specify a whole plan of operations before executing them. 

Cheers,
Aljoscha

On Tue, 24 May 2016 at 20:43 Maatary Okouya <[hidden email]> wrote:

I'm looking for a way to avoid thread starvation in my tasks, by returning future but i don't see how is that possible.

Hence i would like to know, how flink handle the case where in your job you have to perform network calls (I use akka http or spray) or any IO operation and use the result of it.

In sparks i see asynchronous action and so on. I don't see any equivalent in apache flink. How does it works ? is it supported, or the network call and any io operation have to be synchronous ?

any help, indication, reads and so on would be appreciated


Reply | Threaded
Open this post in threaded view
|

Re: Non blocking operation in Apache flink

Aljoscha Krettek
I see what you mean now. The Akka Streams API is very interesting, in how they allow async calls.

For Flink, I think you could implement it as a custom source that listens for the change stream, starts futures to get data from the database and emits elements when the future completes. I quickly sketched such an approach:


public static class MyDBSource implements ParallelSourceFunction<String> {
    private static final long serialVersionUID = 1L;

    private volatile boolean running = true;

    @Override
    public void run(final SourceContext<String> ctx) throws Exception {
        ChangelogConnection log = new ChangelogConnection();
        DB db = new DB();

        final Object checkpointLock = ctx.getCheckpointLock();

        while (running) {
            // try and fetch next changelog item
            Change change = log.getNextChange();

            DB.fetch(change, new Future() {
                public void complete(String data) {
                    synchronized (checkpointLock) {
                        ctx.collect(data);
                    }
                }
            });
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

I hope that helps.

-Aljoscha

On Wed, 25 May 2016 at 12:21 Maatary Okouya <[hidden email]> wrote:

On Wed, May 25, 2016 at 5:16 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
there is no functionality to have asynchronous calls in user functions in Flink. 

The asynchronous action feature in Spark is also not meant for such things, it is targeted at programs that need to pull all data to the application master. In Flink this is not necessary because you can specify a whole plan of operations before executing them. 

Cheers,
Aljoscha

On Tue, 24 May 2016 at 20:43 Maatary Okouya <[hidden email]> wrote:

I'm looking for a way to avoid thread starvation in my tasks, by returning future but i don't see how is that possible.

Hence i would like to know, how flink handle the case where in your job you have to perform network calls (I use akka http or spray) or any IO operation and use the result of it.

In sparks i see asynchronous action and so on. I don't see any equivalent in apache flink. How does it works ? is it supported, or the network call and any io operation have to be synchronous ?

any help, indication, reads and so on would be appreciated


Reply | Threaded
Open this post in threaded view
|

Re: Non blocking operation in Apache flink

Maatary Okouya
Thank you, 

i will study that. it is a bit more raw i would say. The thing is my source is Kafka. I will have to see how to combine all of that altogether in the most elegant way possible. Will get back to you on this, after i scratch my head enough. 

Best,

Daniel

On Wed, May 25, 2016 at 11:02 AM, Aljoscha Krettek <[hidden email]> wrote:
I see what you mean now. The Akka Streams API is very interesting, in how they allow async calls.

For Flink, I think you could implement it as a custom source that listens for the change stream, starts futures to get data from the database and emits elements when the future completes. I quickly sketched such an approach:


public static class MyDBSource implements ParallelSourceFunction<String> {
    private static final long serialVersionUID = 1L;

    private volatile boolean running = true;

    @Override
    public void run(final SourceContext<String> ctx) throws Exception {
        ChangelogConnection log = new ChangelogConnection();
        DB db = new DB();

        final Object checkpointLock = ctx.getCheckpointLock();

        while (running) {
            // try and fetch next changelog item
            Change change = log.getNextChange();

            DB.fetch(change, new Future() {
                public void complete(String data) {
                    synchronized (checkpointLock) {
                        ctx.collect(data);
                    }
                }
            });
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

I hope that helps.

-Aljoscha

On Wed, 25 May 2016 at 12:21 Maatary Okouya <[hidden email]> wrote:

On Wed, May 25, 2016 at 5:16 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
there is no functionality to have asynchronous calls in user functions in Flink. 

The asynchronous action feature in Spark is also not meant for such things, it is targeted at programs that need to pull all data to the application master. In Flink this is not necessary because you can specify a whole plan of operations before executing them. 

Cheers,
Aljoscha

On Tue, 24 May 2016 at 20:43 Maatary Okouya <[hidden email]> wrote:

I'm looking for a way to avoid thread starvation in my tasks, by returning future but i don't see how is that possible.

Hence i would like to know, how flink handle the case where in your job you have to perform network calls (I use akka http or spray) or any IO operation and use the result of it.

In sparks i see asynchronous action and so on. I don't see any equivalent in apache flink. How does it works ? is it supported, or the network call and any io operation have to be synchronous ?

any help, indication, reads and so on would be appreciated