Hi,
Let's say we have multiple subtask chains and all of them are executing in the same task manager slot (i.e. in the same JVM). What's the point in serializing data between them? Can it be disabled? The reason I want keep different chains is that some subtasks should be executed in parallel to each other. Let's say I have tasks: A -> B After task A pushed some data to task B I want task A to continue processing without waiting for task B to finish. What I'm talking about is a behavior of Akka Streams with disabled fusion. Best regards, Dmitry |
Hey Dmitry,
this is not possible if I'm understanding you correctly. A task chain is executed by a single task thread and hence it is not possible to continue processing before the record "leaves" the thread, which only happens when the next task thread or the network stack consumes it. Hand over between chained tasks happens without serialization. Only data between different task threads is serialized. Depending on your use case the newly introduced async I/O feature might be worth a look (will be part of the upcoming 1.2 release): https://github.com/apache/flink/pull/2629 |
Hi Ufuk, Do you know what's the reason for serialization of data between different threads?Best regards, Dmitry On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi <[hidden email]> wrote: Hey Dmitry, |
One of the reasons is to ensure that data cannot be modified after it left a thread. A function that emits the same object several times (in order to reduce object creation & GC) might accidentally modify emitted records if they would be put as object in a queue. 2017-01-16 14:21 GMT+01:00 Dmitry Golubets <[hidden email]>:
|
First issue is not a problem with idiomatic Scala - we make all our data objects immutable. Second.. yeah, I guess it makes sense.Best regards, Dmitry On Mon, Jan 16, 2017 at 1:27 PM, Fabian Hueske <[hidden email]> wrote:
|
+1 to what Fabian said. Regarding the memory consumption: Flink's back
pressure mechanisms also depends on this, because the availability of (network) buffers determines how fast operator can produce data. If no buffers are available, the producing operator will slow down. On Mon, Jan 16, 2017 at 2:32 PM, Dmitry Golubets <[hidden email]> wrote: > First issue is not a problem with idiomatic Scala - we make all our data > objects immutable. > Second.. yeah, I guess it makes sense. > Thanks for clarification. > > Best regards, > Dmitry > > On Mon, Jan 16, 2017 at 1:27 PM, Fabian Hueske <[hidden email]> wrote: >> >> One of the reasons is to ensure that data cannot be modified after it left >> a thread. >> A function that emits the same object several times (in order to reduce >> object creation & GC) might accidentally modify emitted records if they >> would be put as object in a queue. >> Moreover, it is easier to control the memory consumption if data is >> serialized into a fixed number of buffers instead of being put on the JVM >> heap. >> >> Best, Fabian >> >> 2017-01-16 14:21 GMT+01:00 Dmitry Golubets <[hidden email]>: >>> >>> Hi Ufuk, >>> >>> Do you know what's the reason for serialization of data between different >>> threads? >>> >>> Also, thanks for the link! >>> >>> Best regards, >>> Dmitry >>> >>> On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi <[hidden email]> wrote: >>>> >>>> Hey Dmitry, >>>> >>>> this is not possible if I'm understanding you correctly. >>>> >>>> A task chain is executed by a single task thread and hence it is not >>>> possible to continue processing before the record "leaves" the thread, >>>> which only happens when the next task thread or the network stack >>>> consumes it. >>>> >>>> Hand over between chained tasks happens without serialization. Only >>>> data between different task threads is serialized. >>>> >>>> Depending on your use case the newly introduced async I/O feature >>>> might be worth a look (will be part of the upcoming 1.2 release): >>>> https://github.com/apache/flink/pull/2629 >>> >>> >> > |
Free forum by Nabble | Edit this page |