Flink checkpointing with Azure block storage

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink checkpointing with Azure block storage

Boris Lublinsky
Is there somewhere a complete configuration example for such option?
Reply | Threaded
Open this post in threaded view
|

Re: Flink checkpointing with Azure block storage

Boris Lublinsky
To test it, I created flink-conf.yaml file and put it in resource directory of my project
The file contains the following:

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>

fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: <a href="hdfs://namenode-host:port/flink-checkpoints" class="">hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

Which should of produce error,

But what I see is that it does not seen to take effect:


313 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
3327 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
3329 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender <a href="akka://flink/user/rpc/jobmanager_3" class="">akka://flink/user/rpc/jobmanager_3


On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <[hidden email]> wrote:

Is there somewhere a complete configuration example for such option?

Reply | Threaded
Open this post in threaded view
|

Re: Flink checkpointing with Azure block storage

Yun Tang
Hi Boris

I think the official guide [1] should be enough to tell you how to configure.
However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend".

You can view the log to see whether your changes printed to search for "Loading configuration property".


Best
Yun Tang


From: Boris Lublinsky <[hidden email]>
Sent: Friday, August 21, 2020 7:18
To: user <[hidden email]>
Subject: Re: Flink checkpointing with Azure block storage
 
To test it, I created flink-conf.yaml file and put it in resource directory of my project
The file contains the following:

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>

fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

Which should of produce error,

But what I see is that it does not seen to take effect:


313 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
3327 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
3329 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3


On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <[hidden email]> wrote:

Is there somewhere a complete configuration example for such option?

Reply | Threaded
Open this post in threaded view
|

Re: Flink checkpointing with Azure block storage

Boris Lublinsky
Thanks Yun,
I make it work, but now I want to set appropriate config programmatically.
I can set state.checkpointing.dir by:

val fsStateBackend = new FsStateBackend(new URI("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"))
env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend])

But, I can’t update configuration to add credentials fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>
Because getConfiguration is a private method. Any suggestions?




On Aug 20, 2020, at 9:29 PM, Yun Tang <[hidden email]> wrote:

Hi Boris

I think the official guide [1] should be enough to tell you how to configure.
However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend".

You can view the log to see whether your changes printed to search for "Loading configuration property".


Best
Yun Tang


From: Boris Lublinsky <[hidden email]>
Sent: Friday, August 21, 2020 7:18
To: user <[hidden email]>
Subject: Re: Flink checkpointing with Azure block storage
 
To test it, I created flink-conf.yaml file and put it in resource directory of my project
The file contains the following:

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>

fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

Which should of produce error,

But what I see is that it does not seen to take effect:


313 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
3327 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
3329 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3


On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <[hidden email]> wrote:

Is there somewhere a complete configuration example for such option?

Reply | Threaded
Open this post in threaded view
|

Re: Flink checkpointing with Azure block storage

Piyush Narang

We had something like this when we were setting it in our code (now we’re passing it via config). There’s likely a better /cleaner way:

private def configureCheckpoints(env: StreamExecutionEnvironment,
                                 checkpointPath:
String): Unit = {
 
if (checkpointPath.startsWith("wasb")) {
   
import org.apache.hadoop.fs.{Path => HPath}
   
import org.apache.flink.configuration.Configuration

    import org.apache.flink.core.fs.FileSystem

   
val jobCheckpointsPath = new HPath(checkpointPath)
   
val conf = new Configuration()
    conf.setString(
     
"fs.azure.account.key.storage-account.blob.core.windows.net",
     
"access-key"
   
)
    FileSystem.initialize(conf)
// this ensures the AzureFS is initialized and with correct creds
 
}
 
// other checkpoint config stuff
}

 

-- Piyush

 

 

From: Boris Lublinsky <[hidden email]>
Date: Saturday, August 22, 2020 at 10:08 PM
To: Yun Tang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Flink checkpointing with Azure block storage

 

Thanks Yun,

I make it work, but now I want to set appropriate config programmatically.

I can set state.checkpointing.dir by:

 

val fsStateBackend = new FsStateBackend(new URI("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"))
env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend])

 

But, I can’t update configuration to add credentials fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>

Because getConfiguration is a private method. Any suggestions?





 



On Aug 20, 2020, at 9:29 PM, Yun Tang <[hidden email]> wrote:

 

Hi Boris

 

I think the official guide [1] should be enough to tell you how to configure.

However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend".

 

You can view the log to see whether your changes printed to search for "Loading configuration property".

 

 

Best

Yun Tang

 


From: Boris Lublinsky <[hidden email]>
Sent: Friday, August 21, 2020 7:18
To: user <[hidden email]>
Subject: Re: Flink checkpointing with Azure block storage

 

To test it, I created flink-conf.yaml file and put it in resource directory of my project

The file contains the following:

 

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>

fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

 

Which should of produce error,

 

But what I see is that it does not seen to take effect:

 

 

313 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)

3327 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).

3329 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3

 



On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <[hidden email]> wrote:

 

Is there somewhere a complete configuration example for such option?

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink checkpointing with Azure block storage

Boris Lublinsky
Thanks Plyush,
The thing that I was missing is this.
Now it all works


On Aug 24, 2020, at 2:44 PM, Piyush Narang <[hidden email]> wrote:

We had something like this when we were setting it in our code (now we’re passing it via config). There’s likely a better /cleaner way:

private def configureCheckpoints(env: StreamExecutionEnvironment,
                                 checkpointPath:
String): Unit = {
 
if (checkpointPath.startsWith("wasb")) {
   
import org.apache.hadoop.fs.{Path => HPath}
   
import org.apache.flink.configuration.Configuration

    import org.apache.flink.core.fs.FileSystem

   
val jobCheckpointsPath = new HPath(checkpointPath)
   
val conf = new Configuration()
    conf.setString(
     
"fs.azure.account.key.storage-account.blob.core.windows.net",
     
"access-key"
   
)
    FileSystem.initialize(conf)
// this ensures the AzureFS is initialized and with correct creds
 
}
 
// other checkpoint config stuff
}

 

-- Piyush

 

 

From: Boris Lublinsky <[hidden email]>
Date: Saturday, August 22, 2020 at 10:08 PM
To: Yun Tang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Flink checkpointing with Azure block storage

 

Thanks Yun,

I make it work, but now I want to set appropriate config programmatically.

I can set state.checkpointing.dir by:

 

val fsStateBackend = new FsStateBackend(new URI("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"))
env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend])

 

But, I can’t update configuration to add credentials fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>

Because getConfiguration is a private method. Any suggestions?





 



On Aug 20, 2020, at 9:29 PM, Yun Tang <[hidden email]> wrote:

 

Hi Boris

 

I think the official guide [1] should be enough to tell you how to configure.

However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend".

 

You can view the log to see whether your changes printed to search for "Loading configuration property".

 

 

Best

Yun Tang

 


From: Boris Lublinsky <[hidden email]>
Sent: Friday, August 21, 2020 7:18
To: user <[hidden email]>
Subject: Re: Flink checkpointing with Azure block storage

 

To test it, I created flink-conf.yaml file and put it in resource directory of my project

The file contains the following:

 

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>

fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: <a href="hdfs://namenode-host:port/flink-checkpoints" class="">hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

 

Which should of produce error,

 

But what I see is that it does not seen to take effect:

 

 

313 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)

3327 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).

3329 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender <a href="akka://flink/user/rpc/jobmanager_3" class="">akka://flink/user/rpc/jobmanager_3

 



On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <[hidden email]> wrote:

 

Is there somewhere a complete configuration example for such option?