Question about Flink internals

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about Flink internals

Junguk Cho
Hi, All.

I am new to Flink.
I just installed Flink in clusters and start reading documents to understand Flink internals.
After reading some documents, I have some questions.
I have some experiences of Storm and Heron before, so I am linking their mechanisms to questions to better understand Flink.

1. Can I specify worker parallelism explicitly like Storm?

2. Record in Flink
Can I think a "record" in FLINK is almost same as Tuple in Storm?
Tuple in Storm is used for carrying "real data" + "metadata (e.g., stream type, source id and so on).

3. How does partition (e.g., shuffling,  map) works internally?
In Storm, it has (worker id) : (tcp info to next workers) tables. 
So, based on this information, after executing partition function, Tuple is  forwarded to next hops based on tables.
Is it the same?

4. How does Flink detect fault in case of worker dead machine failure?
Based on documents, Job manager checks liveness of task managers with heartbeat message.
In Storm, supervisor (I think it is similar with Task manager) first detects worker dead based on heartbeat and locally re-runs it again. For machine failure, Nimbus (I think it is similar with Job manager) detects machine failure based on supervisor's heartbeat and re-schedule all assigned worker to other machine.
How does Flink work?

5. For exactly-once delivery, Flink uses checking point and record replay mechanism.
It needs messages queues (e.g, Kafka) for record replay.
Kafka uses TCP to send and receive data. So I wonder if data source does not use TCP (e.g., IoT sensors), what is general solutions to use record replay?
For example, source workers are directly connected to several inputs (e.g., IoT sensors) while I think it is not normal deployments.

6. Flink supports Cycles. 
However,  based on documents, Cycled tasks act as regular dataflow source and sink respectively, yet they are collocated in the same physical instance to share an in-memory buffer and thus, implement loopback stream transparently. 
So, what if the number of workers which make cycles is high? It would be hard to put them in the same physical machine.

Thanks,
Junguk
Reply | Threaded
Open this post in threaded view
|

Re: Question about Flink internals

Timo Walther
Hi Junguk,

I try to answer your questions, but also loop in Ufuk who might now more
about the network internals:

1. Yes, every operator/operator chain has a "setParallelism()" method do
specify the parallelism. The overall parallelism of the job can be set
when submitting a job. The parallelism per TaskManager is determined by
the number of slots.

2. From a user's perspective you can only see the "real data".
Internally, there are different types of records that flow through the
topology (namely watermarks, checkpoint barriers, latency markers, and
records with or without timestamp metadata).

3. See my last comment.

4. Flink also uses heartbeat messages between JobManager and
TaskManagers. In case of a failure the JobManager restores the entire
topology to the last successful checkpoint. See [1] for more
explanation. In the future it is planned to recover more fine-grained.

5. Source workers should not be directly connected but though systems
like Kafka or Pravega. Not only for replaying in case of failures but
also for using it as the single source of truth in case your processing
logic needs to be adapted. E.g. you had a bug in your application and
the state that you have built is invalid, you want to be able to correct
your mistake and rebuild the state in a batch. The folks from Drivetribe
showed a very nice architecture [2]. I don't know if replaying your IoT
devices would make sense, in theory you could implement your own
connector that implements a similar logic as Flink's Kafka consumer.

6. I don't know about the internals of iteration feature but you might
be right. Cyclic dataflows are not fully supported yet. E.g. they are
also not participating in Flink's checkpointing mechanism.

In general, I would recommmend to import Flink into your IDE and set a
breakpoint in an example (e.g. within a mapper before a keyBy) and run
it in debug mode. You can step through the layers to see more about the
internals. This should answer most of your question, otherwise feel free
to ask again.

Regards,
Timo

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html
[2] https://data-artisans.com/blog/drivetribe-cqrs-apache-flink

Am 06.09.17 um 21:54 schrieb Junguk Cho:

> Hi, All.
>
> I am new to Flink.
> I just installed Flink in clusters and start reading documents to
> understand Flink internals.
> After reading some documents, I have some questions.
> I have some experiences of Storm and Heron before, so I am linking
> their mechanisms to questions to better understand Flink.
>
> 1. Can I specify worker parallelism explicitly like Storm?
>
> 2. Record in Flink
> Can I think a "record" in FLINK is almost same as Tuple in Storm?
> Tuple in Storm is used for carrying "real data" + "metadata (e.g.,
> stream type, source id and so on).
>
> 3. How does partition (e.g., shuffling,  map) works internally?
> In Storm, it has (worker id) : (tcp info to next workers) tables.
> So, based on this information, after executing partition function,
> Tuple is  forwarded to next hops based on tables.
> Is it the same?
>
> 4. How does Flink detect fault in case of worker dead machine failure?
> Based on documents, Job manager checks liveness of task managers with
> heartbeat message.
> In Storm, supervisor (I think it is similar with Task manager) first
> detects worker dead based on heartbeat and locally re-runs it again.
> For machine failure, Nimbus (I think it is similar with Job manager)
> detects machine failure based on supervisor's heartbeat and
> re-schedule all assigned worker to other machine.
> How does Flink work?
>
> 5. For exactly-once delivery, Flink uses checking point and record
> replay mechanism.
> It needs messages queues (e.g, Kafka) for record replay.
> Kafka uses TCP to send and receive data. So I wonder if data source
> does not use TCP (e.g., IoT sensors), what is general solutions to use
> record replay?
> For example, source workers are directly connected to several inputs
> (e.g., IoT sensors) while I think it is not normal deployments.
>
> 6. Flink supports Cycles.
> However,  based on documents, Cycled tasks act as regular dataflow
> source and sink respectively, yet they are collocated in the same
> physical instance to share an in-memory buffer and thus, implement
> loopback stream transparently.
> So, what if the number of workers which make cycles is high? It would
> be hard to put them in the same physical machine.
>
> Thanks,
> Junguk


Reply | Threaded
Open this post in threaded view
|

Re: Question about Flink internals

Junguk Cho
Hi, Timo.

Thank you for detailed replies. 
It helps me to understand flink a lot.

However, there are misinterpreted points.

2. From a user's perspective you can only see the "real data". Internally, there are different types of records that flow through the topology (namely watermarks, checkpoint barriers, latency markers, and records with or without timestamp metadata).
-> I understood there are several type of records. I wonder "record" class and its members. E.g., Tuple in Storm (https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java)


6. I don't know about the internals of iteration feature but you might be right. Cyclic dataflows are not fully supported yet. E.g. they are also not participating in Flink's checkpointing mechanism.
->  Based on Section 3.4 in this paper (http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf), it seemed that Flink supports checkingpoint for cyclic dataflows. However, there is this limitation (Cycled tasks act as regular dataflow source and sink respectively, yet they are collocated in the same physical instance to share an in-memory buffer and thus, implement loopback stream transparently.).

In general, I would recommmend to import Flink into your IDE and set a breakpoint in an example (e.g. within a mapper before a keyBy) and run it in debug mode. You can step through the layers to see more about the internals. This should answer most of your question, otherwise feel free to ask again.
-> I will try this. Thanks a lot.

Thanks,
Junguk

On Thu, Sep 7, 2017 at 6:14 AM, Timo Walther <[hidden email]> wrote:
Hi Junguk,

I try to answer your questions, but also loop in Ufuk who might now more about the network internals:

1. Yes, every operator/operator chain has a "setParallelism()" method do specify the parallelism. The overall parallelism of the job can be set when submitting a job. The parallelism per TaskManager is determined by the number of slots.

2. From a user's perspective you can only see the "real data". Internally, there are different types of records that flow through the topology (namely watermarks, checkpoint barriers, latency markers, and records with or without timestamp metadata).

3. See my last comment.

4. Flink also uses heartbeat messages between JobManager and TaskManagers. In case of a failure the JobManager restores the entire topology to the last successful checkpoint. See [1] for more explanation. In the future it is planned to recover more fine-grained.

5. Source workers should not be directly connected but though systems like Kafka or Pravega. Not only for replaying in case of failures but also for using it as the single source of truth in case your processing logic needs to be adapted. E.g. you had a bug in your application and the state that you have built is invalid, you want to be able to correct your mistake and rebuild the state in a batch. The folks from Drivetribe showed a very nice architecture [2]. I don't know if replaying your IoT devices would make sense, in theory you could implement your own connector that implements a similar logic as Flink's Kafka consumer.

6. I don't know about the internals of iteration feature but you might be right. Cyclic dataflows are not fully supported yet. E.g. they are also not participating in Flink's checkpointing mechanism.

In general, I would recommmend to import Flink into your IDE and set a breakpoint in an example (e.g. within a mapper before a keyBy) and run it in debug mode. You can step through the layers to see more about the internals. This should answer most of your question, otherwise feel free to ask again.

Regards,
Timo

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html
[2] https://data-artisans.com/blog/drivetribe-cqrs-apache-flink

Am 06.09.17 um 21:54 schrieb Junguk Cho:

Hi, All.

I am new to Flink.
I just installed Flink in clusters and start reading documents to understand Flink internals.
After reading some documents, I have some questions.
I have some experiences of Storm and Heron before, so I am linking their mechanisms to questions to better understand Flink.

1. Can I specify worker parallelism explicitly like Storm?

2. Record in Flink
Can I think a "record" in FLINK is almost same as Tuple in Storm?
Tuple in Storm is used for carrying "real data" + "metadata (e.g., stream type, source id and so on).

3. How does partition (e.g., shuffling,  map) works internally?
In Storm, it has (worker id) : (tcp info to next workers) tables.
So, based on this information, after executing partition function, Tuple is  forwarded to next hops based on tables.
Is it the same?

4. How does Flink detect fault in case of worker dead machine failure?
Based on documents, Job manager checks liveness of task managers with heartbeat message.
In Storm, supervisor (I think it is similar with Task manager) first detects worker dead based on heartbeat and locally re-runs it again. For machine failure, Nimbus (I think it is similar with Job manager) detects machine failure based on supervisor's heartbeat and re-schedule all assigned worker to other machine.
How does Flink work?

5. For exactly-once delivery, Flink uses checking point and record replay mechanism.
It needs messages queues (e.g, Kafka) for record replay.
Kafka uses TCP to send and receive data. So I wonder if data source does not use TCP (e.g., IoT sensors), what is general solutions to use record replay?
For example, source workers are directly connected to several inputs (e.g., IoT sensors) while I think it is not normal deployments.

6. Flink supports Cycles.
However,  based on documents, Cycled tasks act as regular dataflow source and sink respectively, yet they are collocated in the same physical instance to share an in-memory buffer and thus, implement loopback stream transparently.
So, what if the number of workers which make cycles is high? It would be hard to put them in the same physical machine.

Thanks,
Junguk