A couple of question for Stateful Functions

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

A couple of question for Stateful Functions

danp
Hi,

Nice to see the progress of Stateful functions.

I have a few questions that I hope you can reply to.

My first question is regarding the newly implemented
StatefulFunctionDataStreamBuilder.
Is there anything to pay attention to if one first union a couple of streams
and performs a sort via a keyBy and a KeyedProcessFunction before
dispatching the messages to via RoutableMessage?
In this sorting I'm using a mapstate and I guess my question is if one can
freely mix Flink core with SF's code with regards to performance,
fault-tolerance, and checkpointing?

I'm unable to use Protobuf so POJO's is going to be processed. Is there a
large performance impact of using Pojos instead of protos in SF?  

Also I wonder if there's going to be a severe performance penalty if I had a
function that was called very often lets say 1000 a second and hold a
PersistentAppendingBuffer with those objects appended for each message? Then
when the 1001:st message comes or a timetrigger kicks in, I would write
everything to disk and delete the state. Would the appendingBuffer be
de-/serialized for each function invocation?

If yes, is there any workaround for this so the data is just held in RAM?

Thanks,

Regards
Dan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: A couple of question for Stateful Functions

Till Rohrmann
Hi Dan,

thanks for reaching out to the Flink community. I'm pulling in Gordon and Igal who will be able to answer your questions.

Cheers,
Till

On Wed, Sep 2, 2020 at 8:22 AM danp <[hidden email]> wrote:
Hi,

Nice to see the progress of Stateful functions.

I have a few questions that I hope you can reply to.

My first question is regarding the newly implemented
StatefulFunctionDataStreamBuilder.
Is there anything to pay attention to if one first union a couple of streams
and performs a sort via a keyBy and a KeyedProcessFunction before
dispatching the messages to via RoutableMessage?
In this sorting I'm using a mapstate and I guess my question is if one can
freely mix Flink core with SF's code with regards to performance,
fault-tolerance, and checkpointing?

I'm unable to use Protobuf so POJO's is going to be processed. Is there a
large performance impact of using Pojos instead of protos in SF? 

Also I wonder if there's going to be a severe performance penalty if I had a
function that was called very often lets say 1000 a second and hold a
PersistentAppendingBuffer with those objects appended for each message? Then
when the 1001:st message comes or a timetrigger kicks in, I would write
everything to disk and delete the state. Would the appendingBuffer be
de-/serialized for each function invocation?

If yes, is there any workaround for this so the data is just held in RAM?

Thanks,

Regards
Dan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: A couple of question for Stateful Functions

Igal Shilman
Hi Dan, let me try to answer your questions:
 
I guess my question is if one can
freely mix Flink core with SF's code with regards to performance,
fault-tolerance, and checkpointing?

The main limitations at the moment is that, currently SF requires a processing time watermark semantics only, event time is not supported as it is difficult to reason about completion in the presence of loops.
Other than that in respect to fault-tolerance and checkpointing StateFun is built on top of the DataStream API, so the same gurantines applies to SF as-well.


I'm unable to use Protobuf so POJO's is going to be processed. Is there a
large performance impact of using Pojos instead of protos in SF?  

The only supported message data type for SF is Protocol Buffers and I would highly recommend to use that, 
One option is to transform your Pojo into a Protobuf just before entering SF, and you can convert it back to your original Pojo when exiting from SF.
If you absolutely have to use something else, you can fall back to kryo[1] or provide your own[2] but then schema evaluation of your messages and state is not guaranteed anymore, and you
lose the ability to communicate with remote functions.

Would the appendingBuffer be
de-/serialized for each function invocation?

The appending buffer supports efficient appends (the buffer is *not* deserialized on every function invocation)
In fact, it is backed by Flink's ListState[3] 


Thanks,
Igal

On Wed, Sep 2, 2020 at 10:51 AM Till Rohrmann <[hidden email]> wrote:
Hi Dan,

thanks for reaching out to the Flink community. I'm pulling in Gordon and Igal who will be able to answer your questions.

Cheers,
Till

On Wed, Sep 2, 2020 at 8:22 AM danp <[hidden email]> wrote:
Hi,

Nice to see the progress of Stateful functions.

I have a few questions that I hope you can reply to.

My first question is regarding the newly implemented
StatefulFunctionDataStreamBuilder.
Is there anything to pay attention to if one first union a couple of streams
and performs a sort via a keyBy and a KeyedProcessFunction before
dispatching the messages to via RoutableMessage?
In this sorting I'm using a mapstate and I guess my question is if one can
freely mix Flink core with SF's code with regards to performance,
fault-tolerance, and checkpointing?

I'm unable to use Protobuf so POJO's is going to be processed. Is there a
large performance impact of using Pojos instead of protos in SF? 

Also I wonder if there's going to be a severe performance penalty if I had a
function that was called very often lets say 1000 a second and hold a
PersistentAppendingBuffer with those objects appended for each message? Then
when the 1001:st message comes or a timetrigger kicks in, I would write
everything to disk and delete the state. Would the appendingBuffer be
de-/serialized for each function invocation?

If yes, is there any workaround for this so the data is just held in RAM?

Thanks,

Regards
Dan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: A couple of question for Stateful Functions

danp
Thanks for your quick reply.

/Dan

Den ons 2 sep. 2020 kl 12:24 skrev Igal Shilman <[hidden email]>:
Hi Dan, let me try to answer your questions:
 
I guess my question is if one can
freely mix Flink core with SF's code with regards to performance,
fault-tolerance, and checkpointing?

The main limitations at the moment is that, currently SF requires a processing time watermark semantics only, event time is not supported as it is difficult to reason about completion in the presence of loops.
Other than that in respect to fault-tolerance and checkpointing StateFun is built on top of the DataStream API, so the same gurantines applies to SF as-well.


I'm unable to use Protobuf so POJO's is going to be processed. Is there a
large performance impact of using Pojos instead of protos in SF?  

The only supported message data type for SF is Protocol Buffers and I would highly recommend to use that, 
One option is to transform your Pojo into a Protobuf just before entering SF, and you can convert it back to your original Pojo when exiting from SF.
If you absolutely have to use something else, you can fall back to kryo[1] or provide your own[2] but then schema evaluation of your messages and state is not guaranteed anymore, and you
lose the ability to communicate with remote functions.

Would the appendingBuffer be
de-/serialized for each function invocation?

The appending buffer supports efficient appends (the buffer is *not* deserialized on every function invocation)
In fact, it is backed by Flink's ListState[3] 


Thanks,
Igal

On Wed, Sep 2, 2020 at 10:51 AM Till Rohrmann <[hidden email]> wrote:
Hi Dan,

thanks for reaching out to the Flink community. I'm pulling in Gordon and Igal who will be able to answer your questions.

Cheers,
Till

On Wed, Sep 2, 2020 at 8:22 AM danp <[hidden email]> wrote:
Hi,

Nice to see the progress of Stateful functions.

I have a few questions that I hope you can reply to.

My first question is regarding the newly implemented
StatefulFunctionDataStreamBuilder.
Is there anything to pay attention to if one first union a couple of streams
and performs a sort via a keyBy and a KeyedProcessFunction before
dispatching the messages to via RoutableMessage?
In this sorting I'm using a mapstate and I guess my question is if one can
freely mix Flink core with SF's code with regards to performance,
fault-tolerance, and checkpointing?

I'm unable to use Protobuf so POJO's is going to be processed. Is there a
large performance impact of using Pojos instead of protos in SF? 

Also I wonder if there's going to be a severe performance penalty if I had a
function that was called very often lets say 1000 a second and hold a
PersistentAppendingBuffer with those objects appended for each message? Then
when the 1001:st message comes or a timetrigger kicks in, I would write
everything to disk and delete the state. Would the appendingBuffer be
de-/serialized for each function invocation?

If yes, is there any workaround for this so the data is just held in RAM?

Thanks,

Regards
Dan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/