Separate checkpoint directories

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Separate checkpoint directories

Kyle Hamlin
Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a neat feature. I would like to use this feature, but I'm wondering how that impacts the FsStateBackend checkpointing mechanism. Before I would subscribe to one topic and set a checkpoint path specific to that topic for example if the Kafka topic name was foo:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))

How does one dynamically set these checkpoint paths? Is it even necessary to do so, should I have one checkpoint path for all the possible topics the regex pattern could pick up?
Reply | Threaded
Open this post in threaded view
|

Re: Separate checkpoint directories

Stefan Richter
Hi,

first, let my ask why you want to have a different checkpoint directory per topic? It is perfectly ok to have just a single checkpoint directory, so I wonder what the intention is? Flink will already create proper subdirectories and filenames and can identify the right checkpoint data for each operator instance.

Best,
Stefan

Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <[hidden email]>:

Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a neat feature. I would like to use this feature, but I'm wondering how that impacts the FsStateBackend checkpointing mechanism. Before I would subscribe to one topic and set a checkpoint path specific to that topic for example if the Kafka topic name was foo:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))

How does one dynamically set these checkpoint paths? Is it even necessary to do so, should I have one checkpoint path for all the possible topics the regex pattern could pick up?

Reply | Threaded
Open this post in threaded view
|

Re: Separate checkpoint directories

Kyle Hamlin


On Jan 3, 2018, at 5:51 AM, Stefan Richter <[hidden email]> wrote:

Hi,

first, let my ask why you want to have a different checkpoint directory per topic? It is perfectly ok to have just a single checkpoint directory, so I wonder what the intention is? Flink will already create proper subdirectories and filenames and can identify the right checkpoint data for each operator instance.

Best,
Stefan

Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <[hidden email]>:

Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a neat feature. I would like to use this feature, but I'm wondering how that impacts the FsStateBackend checkpointing mechanism. Before I would subscribe to one topic and set a checkpoint path specific to that topic for example if the Kafka topic name was foo:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))

How does one dynamically set these checkpoint paths? Is it even necessary to do so, should I have one checkpoint path for all the possible topics the regex pattern could pick up?

Reply | Threaded
Open this post in threaded view
|

Re: Separate checkpoint directories

Kyle Hamlin
In reply to this post by Stefan Richter
Hi Stefan,

In the past, I ran four separate Flink apps to sink data from four separate Kafka topics to s3 without any transformations applied. For each Flink app, I would set the checkpoint directory to s3://some-bucket/checkpoints/topic-name. It appears that with Flink 1.4 I can just use a regex to pick up all four topics, and so what you are telling me is that even if my regex picks up 1000 Kafka topics the only checkpoint path I need is s3://some-bucket/checkpoints/ and Flink will take care of the rest?

Additionally, I was wondering how this concept might extend to sinking the data from this single Flink app that has picked up 1000 Kafka topics to separate s3 paths? For instance:

s3://some-bucket/data/topic1/
s3://some-bucket/data/topic2/
.
.
.
s3://some-bucket/data/topic1000/

Thanks very much for your help Stefan!

On Wed, Jan 3, 2018 at 10:51 AM Stefan Richter <[hidden email]> wrote:
Hi,

first, let my ask why you want to have a different checkpoint directory per topic? It is perfectly ok to have just a single checkpoint directory, so I wonder what the intention is? Flink will already create proper subdirectories and filenames and can identify the right checkpoint data for each operator instance.

Best,
Stefan


Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <[hidden email]>:

Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a neat feature. I would like to use this feature, but I'm wondering how that impacts the FsStateBackend checkpointing mechanism. Before I would subscribe to one topic and set a checkpoint path specific to that topic for example if the Kafka topic name was foo:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))

How does one dynamically set these checkpoint paths? Is it even necessary to do so, should I have one checkpoint path for all the possible topics the regex pattern could pick up?

Reply | Threaded
Open this post in threaded view
|

Re: Separate checkpoint directories

Stefan Richter
Hi,

the state is checkpointed in subdirectories and with unique file names, so having all in one root directory is no problem. This all happens automatically.

As far as I know, there is no implementation that generates output paths for sinks like that. You could open a jira with a feature wish, though.

Best,
Stefan

Am 03.01.2018 um 16:06 schrieb Kyle Hamlin <[hidden email]>:

Hi Stefan,

In the past, I ran four separate Flink apps to sink data from four separate Kafka topics to s3 without any transformations applied. For each Flink app, I would set the checkpoint directory to <a href="s3://some-bucket/checkpoints/topic-name" class="">s3://some-bucket/checkpoints/topic-name. It appears that with Flink 1.4 I can just use a regex to pick up all four topics, and so what you are telling me is that even if my regex picks up 1000 Kafka topics the only checkpoint path I need is <a href="s3://some-bucket/checkpoints/" class="">s3://some-bucket/checkpoints/ and Flink will take care of the rest?

Additionally, I was wondering how this concept might extend to sinking the data from this single Flink app that has picked up 1000 Kafka topics to separate s3 paths? For instance:

<a href="s3://some-bucket/data/topic1/" class="">s3://some-bucket/data/topic1/
<a href="s3://some-bucket/data/topic2/" class="">s3://some-bucket/data/topic2/
.
.
.
<a href="s3://some-bucket/data/topic1000/" class="">s3://some-bucket/data/topic1000/

Thanks very much for your help Stefan!

On Wed, Jan 3, 2018 at 10:51 AM Stefan Richter <[hidden email]> wrote:
Hi,

first, let my ask why you want to have a different checkpoint directory per topic? It is perfectly ok to have just a single checkpoint directory, so I wonder what the intention is? Flink will already create proper subdirectories and filenames and can identify the right checkpoint data for each operator instance.

Best,
Stefan


Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <[hidden email]>:

Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a neat feature. I would like to use this feature, but I'm wondering how that impacts the FsStateBackend checkpointing mechanism. Before I would subscribe to one topic and set a checkpoint path specific to that topic for example if the Kafka topic name was foo:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))

How does one dynamically set these checkpoint paths? Is it even necessary to do so, should I have one checkpoint path for all the possible topics the regex pattern could pick up?