Checkpoints in batch processing & JDBC Output Format

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoints in batch processing & JDBC Output Format

Maximilian Bode
Hi everyone,

I am considering using Flink in a project. The setting would be a YARN cluster where data is first read in from HDFS, then processed and finally written into an Oracle database using an upsert command. If I understand the documentation correctly, the DataSet API would be the natural candidate for this problem.

My first question is about the checkpointing system. Apparently (e.g. [1] and [2]) it does not apply to batch processing. So how does Flink handle failures during batch processing? For the use case described above, 'at least once' semantics would suffice – still, are 'exactly once' guarantees possible?
For example, how does Flink handle a failure of one taskmanager during a batch process? What happens in this case, if the data has already partly been written to the database?

Secondly, the most obvious, straight-forward approach of connecting to the Oracle DB would be the JDBC Output Format. In [3], it was mentioned that it does not have many users and might not be trusted. What is the status on this?

Best regards,
Max

[1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-Spark-tp583p587.html
[2] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Batch-Processing-as-Streaming-td1909.html
[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PotsgreSQL-JDBC-Sink-quot-writeRecord-failed-quot-and-quot-Batch-element-cancelled-quot-on-upsert-td623.html

signature.asc (465 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints in batch processing & JDBC Output Format

Stephan Ewen
Hi!

You can use both the DataSet API or the DataStream API for that. In case of failures, they would behave slightly differently.

DataSet:

Fault tolerance for the DataSet API works by restarting the job and redoing all of the work. In some sense, that is similar to what happens in MapReduce, only that Flink currently restarts more tasks than strictly necessary (work in progress to reduce that). The periodic in-flight checkpoints are not used here.

DataStream:

This one would start immediately inserting data (as it is a streaming job), and draw periodic checkpoints that make sure replay-on-failure only has to redo only a bit, not everything.Whether this fits your use case depends on the type of processing you want to do.
You could even use this job in a way that it monitors the directory for new files, picks them up, and starts immediate insertion into the database when they appear.


Considering the last question (JDBC output format): Using UPSERT needs a few modifications (issue that another user had), you would probably have to write a custom output format that would be based on the JDBC output format.

If you go with the streaming API, it should be possible to change the database writing output format to give you exactly-once semantics. The way to do that would be to commit the upserts only on completed checkpoints (and buffer them in the sink between checkpoints). This may be interesting if your database cannot deduplicate insertions (no deterministic primary key).

Greetings,
Stephan


On Mon, Nov 9, 2015 at 5:25 PM, Maximilian Bode <[hidden email]> wrote:
Hi everyone,

I am considering using Flink in a project. The setting would be a YARN cluster where data is first read in from HDFS, then processed and finally written into an Oracle database using an upsert command. If I understand the documentation correctly, the DataSet API would be the natural candidate for this problem.

My first question is about the checkpointing system. Apparently (e.g. [1] and [2]) it does not apply to batch processing. So how does Flink handle failures during batch processing? For the use case described above, 'at least once' semantics would suffice – still, are 'exactly once' guarantees possible?
For example, how does Flink handle a failure of one taskmanager during a batch process? What happens in this case, if the data has already partly been written to the database?

Secondly, the most obvious, straight-forward approach of connecting to the Oracle DB would be the JDBC Output Format. In [3], it was mentioned that it does not have many users and might not be trusted. What is the status on this?

Best regards,
Max

[1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-Spark-tp583p587.html
[2] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Batch-Processing-as-Streaming-td1909.html
[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PotsgreSQL-JDBC-Sink-quot-writeRecord-failed-quot-and-quot-Batch-element-cancelled-quot-on-upsert-td623.html

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints in batch processing & JDBC Output Format

Maximilian Bode
Hi Stephan,

thank you very much for your answer. I was happy to meet Robert in Munich last week and he proposed that for our problem, batch processing is the way to go.

We also talked about how exactly to guarantee in this context that no data is lost even in the case the job dies while writing to the database. His idea was based on inserting a 'batch id' field into the database and therefore being able to check whether something has already been committed or not. Do you happen to have further input on how this or a similar approach (e.g. using a timestamp) could be automated, perhaps by customizing the output format as well?

Cheers,
Max

Am 11.11.2015 um 11:35 schrieb Stephan Ewen <[hidden email]>:

Hi!

You can use both the DataSet API or the DataStream API for that. In case of failures, they would behave slightly differently.

DataSet:

Fault tolerance for the DataSet API works by restarting the job and redoing all of the work. In some sense, that is similar to what happens in MapReduce, only that Flink currently restarts more tasks than strictly necessary (work in progress to reduce that). The periodic in-flight checkpoints are not used here.

DataStream:

This one would start immediately inserting data (as it is a streaming job), and draw periodic checkpoints that make sure replay-on-failure only has to redo only a bit, not everything.Whether this fits your use case depends on the type of processing you want to do.
You could even use this job in a way that it monitors the directory for new files, picks them up, and starts immediate insertion into the database when they appear.


Considering the last question (JDBC output format): Using UPSERT needs a few modifications (issue that another user had), you would probably have to write a custom output format that would be based on the JDBC output format.

If you go with the streaming API, it should be possible to change the database writing output format to give you exactly-once semantics. The way to do that would be to commit the upserts only on completed checkpoints (and buffer them in the sink between checkpoints). This may be interesting if your database cannot deduplicate insertions (no deterministic primary key).

Greetings,
Stephan


On Mon, Nov 9, 2015 at 5:25 PM, Maximilian Bode <[hidden email]> wrote:
Hi everyone,

I am considering using Flink in a project. The setting would be a YARN cluster where data is first read in from HDFS, then processed and finally written into an Oracle database using an upsert command. If I understand the documentation correctly, the DataSet API would be the natural candidate for this problem.

My first question is about the checkpointing system. Apparently (e.g. [1] and [2]) it does not apply to batch processing. So how does Flink handle failures during batch processing? For the use case described above, 'at least once' semantics would suffice – still, are 'exactly once' guarantees possible?
For example, how does Flink handle a failure of one taskmanager during a batch process? What happens in this case, if the data has already partly been written to the database?

Secondly, the most obvious, straight-forward approach of connecting to the Oracle DB would be the JDBC Output Format. In [3], it was mentioned that it does not have many users and might not be trusted. What is the status on this?

Best regards,
Max

[1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-Spark-tp583p587.html
[2] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Batch-Processing-as-Streaming-td1909.html
[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PotsgreSQL-JDBC-Sink-quot-writeRecord-failed-quot-and-quot-Batch-element-cancelled-quot-on-upsert-td623.html



signature.asc (465 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints in batch processing & JDBC Output Format

Stephan Ewen
Hi!

If you go with the Batch API, then any failed task (like a sink trying to insert into the database) will be completely re-executed. That makes sure no data is lost in any way, no extra effort needed. 

It may insert a lot of duplicates, though, if the task is re-started after half the data was inserted. That is where streaming does a better job (more fine grained checkpoints / commits). Not sure if you worry about this, or have a deterministic primary key anyways where the database insertion discards duplicate records automatically.

Stephan




On Mon, Nov 16, 2015 at 10:07 AM, Maximilian Bode <[hidden email]> wrote:
Hi Stephan,

thank you very much for your answer. I was happy to meet Robert in Munich last week and he proposed that for our problem, batch processing is the way to go.

We also talked about how exactly to guarantee in this context that no data is lost even in the case the job dies while writing to the database. His idea was based on inserting a 'batch id' field into the database and therefore being able to check whether something has already been committed or not. Do you happen to have further input on how this or a similar approach (e.g. using a timestamp) could be automated, perhaps by customizing the output format as well?

Cheers,
Max

Am 11.11.2015 um 11:35 schrieb Stephan Ewen <[hidden email]>:

Hi!

You can use both the DataSet API or the DataStream API for that. In case of failures, they would behave slightly differently.

DataSet:

Fault tolerance for the DataSet API works by restarting the job and redoing all of the work. In some sense, that is similar to what happens in MapReduce, only that Flink currently restarts more tasks than strictly necessary (work in progress to reduce that). The periodic in-flight checkpoints are not used here.

DataStream:

This one would start immediately inserting data (as it is a streaming job), and draw periodic checkpoints that make sure replay-on-failure only has to redo only a bit, not everything.Whether this fits your use case depends on the type of processing you want to do.
You could even use this job in a way that it monitors the directory for new files, picks them up, and starts immediate insertion into the database when they appear.


Considering the last question (JDBC output format): Using UPSERT needs a few modifications (issue that another user had), you would probably have to write a custom output format that would be based on the JDBC output format.

If you go with the streaming API, it should be possible to change the database writing output format to give you exactly-once semantics. The way to do that would be to commit the upserts only on completed checkpoints (and buffer them in the sink between checkpoints). This may be interesting if your database cannot deduplicate insertions (no deterministic primary key).

Greetings,
Stephan


On Mon, Nov 9, 2015 at 5:25 PM, Maximilian Bode <[hidden email]> wrote:
Hi everyone,

I am considering using Flink in a project. The setting would be a YARN cluster where data is first read in from HDFS, then processed and finally written into an Oracle database using an upsert command. If I understand the documentation correctly, the DataSet API would be the natural candidate for this problem.

My first question is about the checkpointing system. Apparently (e.g. [1] and [2]) it does not apply to batch processing. So how does Flink handle failures during batch processing? For the use case described above, 'at least once' semantics would suffice – still, are 'exactly once' guarantees possible?
For example, how does Flink handle a failure of one taskmanager during a batch process? What happens in this case, if the data has already partly been written to the database?

Secondly, the most obvious, straight-forward approach of connecting to the Oracle DB would be the JDBC Output Format. In [3], it was mentioned that it does not have many users and might not be trusted. What is the status on this?

Best regards,
Max

[1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-Spark-tp583p587.html
[2] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Batch-Processing-as-Streaming-td1909.html
[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PotsgreSQL-JDBC-Sink-quot-writeRecord-failed-quot-and-quot-Batch-element-cancelled-quot-on-upsert-td623.html