Can serialization be disabled between chains?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Can serialization be disabled between chains?

Dmitry Golubets
Hi,

Let's say we have multiple subtask chains and all of them are executing in the same task manager slot (i.e. in the same JVM).
What's the point in serializing data between them?
Can it be disabled?

The reason I want keep different chains is that some subtasks should be executed in parallel to each other.
Let's say I have tasks: A -> B
After task A pushed some data to task B I want task A to continue processing without waiting for task B to finish.
What I'm talking about is a behavior of Akka Streams with disabled fusion.

Best regards,
Dmitry
Reply | Threaded
Open this post in threaded view
|

Re: Can serialization be disabled between chains?

Ufuk Celebi
Hey Dmitry,

this is not possible if I'm understanding you correctly.

A task chain is executed by a single task thread and hence it is not
possible to continue processing before the record "leaves" the thread,
which only happens when the next task thread or the network stack
consumes it.

Hand over between chained tasks happens without serialization. Only
data between different task threads is serialized.

Depending on your use case the newly introduced async I/O feature
might be worth a look (will be part of the upcoming 1.2 release):
https://github.com/apache/flink/pull/2629
Reply | Threaded
Open this post in threaded view
|

Re: Can serialization be disabled between chains?

Dmitry Golubets
Hi Ufuk,

Do you know what's the reason for serialization of data between different threads?

Also, thanks for the link!

Best regards,
Dmitry

On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi <[hidden email]> wrote:
Hey Dmitry,

this is not possible if I'm understanding you correctly.

A task chain is executed by a single task thread and hence it is not
possible to continue processing before the record "leaves" the thread,
which only happens when the next task thread or the network stack
consumes it.

Hand over between chained tasks happens without serialization. Only
data between different task threads is serialized.

Depending on your use case the newly introduced async I/O feature
might be worth a look (will be part of the upcoming 1.2 release):
https://github.com/apache/flink/pull/2629

Reply | Threaded
Open this post in threaded view
|

Re: Can serialization be disabled between chains?

Fabian Hueske-2
One of the reasons is to ensure that data cannot be modified after it left a thread.
A function that emits the same object several times (in order to reduce object creation & GC) might accidentally modify emitted records if they would be put as object in a queue.
Moreover, it is easier to control the memory consumption if data is serialized into a fixed number of buffers instead of being put on the JVM heap.

Best, Fabian

2017-01-16 14:21 GMT+01:00 Dmitry Golubets <[hidden email]>:
Hi Ufuk,

Do you know what's the reason for serialization of data between different threads?

Also, thanks for the link!

Best regards,
Dmitry

On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi <[hidden email]> wrote:
Hey Dmitry,

this is not possible if I'm understanding you correctly.

A task chain is executed by a single task thread and hence it is not
possible to continue processing before the record "leaves" the thread,
which only happens when the next task thread or the network stack
consumes it.

Hand over between chained tasks happens without serialization. Only
data between different task threads is serialized.

Depending on your use case the newly introduced async I/O feature
might be worth a look (will be part of the upcoming 1.2 release):
https://github.com/apache/flink/pull/2629


Reply | Threaded
Open this post in threaded view
|

Re: Can serialization be disabled between chains?

Dmitry Golubets
First issue is not a problem with idiomatic Scala - we make all our data objects immutable.
Second.. yeah, I guess it makes sense.
Thanks for clarification.

Best regards,
Dmitry

On Mon, Jan 16, 2017 at 1:27 PM, Fabian Hueske <[hidden email]> wrote:
One of the reasons is to ensure that data cannot be modified after it left a thread.
A function that emits the same object several times (in order to reduce object creation & GC) might accidentally modify emitted records if they would be put as object in a queue.
Moreover, it is easier to control the memory consumption if data is serialized into a fixed number of buffers instead of being put on the JVM heap.

Best, Fabian

2017-01-16 14:21 GMT+01:00 Dmitry Golubets <[hidden email]>:
Hi Ufuk,

Do you know what's the reason for serialization of data between different threads?

Also, thanks for the link!

Best regards,
Dmitry

On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi <[hidden email]> wrote:
Hey Dmitry,

this is not possible if I'm understanding you correctly.

A task chain is executed by a single task thread and hence it is not
possible to continue processing before the record "leaves" the thread,
which only happens when the next task thread or the network stack
consumes it.

Hand over between chained tasks happens without serialization. Only
data between different task threads is serialized.

Depending on your use case the newly introduced async I/O feature
might be worth a look (will be part of the upcoming 1.2 release):
https://github.com/apache/flink/pull/2629



Reply | Threaded
Open this post in threaded view
|

Re: Can serialization be disabled between chains?

Ufuk Celebi
+1 to what Fabian said. Regarding the memory consumption: Flink's back
pressure mechanisms also depends on this, because the availability of
(network) buffers determines how fast operator can produce data. If no
buffers are available, the producing operator will slow down.

On Mon, Jan 16, 2017 at 2:32 PM, Dmitry Golubets <[hidden email]> wrote:

> First issue is not a problem with idiomatic Scala - we make all our data
> objects immutable.
> Second.. yeah, I guess it makes sense.
> Thanks for clarification.
>
> Best regards,
> Dmitry
>
> On Mon, Jan 16, 2017 at 1:27 PM, Fabian Hueske <[hidden email]> wrote:
>>
>> One of the reasons is to ensure that data cannot be modified after it left
>> a thread.
>> A function that emits the same object several times (in order to reduce
>> object creation & GC) might accidentally modify emitted records if they
>> would be put as object in a queue.
>> Moreover, it is easier to control the memory consumption if data is
>> serialized into a fixed number of buffers instead of being put on the JVM
>> heap.
>>
>> Best, Fabian
>>
>> 2017-01-16 14:21 GMT+01:00 Dmitry Golubets <[hidden email]>:
>>>
>>> Hi Ufuk,
>>>
>>> Do you know what's the reason for serialization of data between different
>>> threads?
>>>
>>> Also, thanks for the link!
>>>
>>> Best regards,
>>> Dmitry
>>>
>>> On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi <[hidden email]> wrote:
>>>>
>>>> Hey Dmitry,
>>>>
>>>> this is not possible if I'm understanding you correctly.
>>>>
>>>> A task chain is executed by a single task thread and hence it is not
>>>> possible to continue processing before the record "leaves" the thread,
>>>> which only happens when the next task thread or the network stack
>>>> consumes it.
>>>>
>>>> Hand over between chained tasks happens without serialization. Only
>>>> data between different task threads is serialized.
>>>>
>>>> Depending on your use case the newly introduced async I/O feature
>>>> might be worth a look (will be part of the upcoming 1.2 release):
>>>> https://github.com/apache/flink/pull/2629
>>>
>>>
>>
>