MemoryStateBackend Issue

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

MemoryStateBackend Issue

Milind Vaidya
Hi 

I see MemoryStateBackend being used in TM Log

org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)



I am logging checkpointed value which is just message count

Snapshot the state 500
Snapshot the state 1000


When I restart the job i.e. new TM but the job manager is same I see

Snapshot the state 500

In the JM logs I see following entries

Triggering checkpoint 1
Triggering checkpoint 2

After restarting job hence new TM

Triggering checkpoint 1

As per my understanding JM should hold the checkpointed state across TM ? Am I correct?

I have not configured anything special and using default. Do I need to add any setting to make it work ?
I want to maintain message count across the TMs.





Reply | Threaded
Open this post in threaded view
|

Re: MemoryStateBackend Issue

Matthias
Hi Milind,
I bet someone else might have a faster answer. But could you provide the logs and config to get a better understanding of what your issue is?
In general, the state is maintained even in cases where a TaskManager fails.

Best,
Matthias

On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <[hidden email]> wrote:
Hi 

I see MemoryStateBackend being used in TM Log

org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)



I am logging checkpointed value which is just message count

Snapshot the state 500
Snapshot the state 1000


When I restart the job i.e. new TM but the job manager is same I see

Snapshot the state 500

In the JM logs I see following entries

Triggering checkpoint 1
Triggering checkpoint 2

After restarting job hence new TM

Triggering checkpoint 1

As per my understanding JM should hold the checkpointed state across TM ? Am I correct?

I have not configured anything special and using default. Do I need to add any setting to make it work ?
I want to maintain message count across the TMs.
Reply | Threaded
Open this post in threaded view
|

Re: MemoryStateBackend Issue

Matthias
One additional question: How did you stop and restart the job? The behavior you're expecting should work with stop-with-savepoint. Cancelling the job and then just restarting it wouldn't work. The latter approach would lead to a new job being created.

Best,
Matthias

On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <[hidden email]> wrote:
Hi Milind,
I bet someone else might have a faster answer. But could you provide the logs and config to get a better understanding of what your issue is?
In general, the state is maintained even in cases where a TaskManager fails.

Best,
Matthias

On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <[hidden email]> wrote:
Hi 

I see MemoryStateBackend being used in TM Log

org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)



I am logging checkpointed value which is just message count

Snapshot the state 500
Snapshot the state 1000


When I restart the job i.e. new TM but the job manager is same I see

Snapshot the state 500

In the JM logs I see following entries

Triggering checkpoint 1
Triggering checkpoint 2

After restarting job hence new TM

Triggering checkpoint 1

As per my understanding JM should hold the checkpointed state across TM ? Am I correct?

I have not configured anything special and using default. Do I need to add any setting to make it work ?
I want to maintain message count across the TMs.


--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
Reply | Threaded
Open this post in threaded view
|

Re: MemoryStateBackend Issue

Milind Vaidya
Hi Matthias,

Yeah you are right. I am canceling the job and hence it is creating new job with new job id and hence it is no respecting previous checkpoint. I observed same behaviour even for local FS backend.

Is there any way to simulated failing of job locally ?

As far as config is concerned, I have not configured any back end in the conf file and defaulting to Memory Checkpoint.

Thanks,
Milind 



On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl <[hidden email]> wrote:
One additional question: How did you stop and restart the job? The behavior you're expecting should work with stop-with-savepoint. Cancelling the job and then just restarting it wouldn't work. The latter approach would lead to a new job being created.

Best,
Matthias

On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <[hidden email]> wrote:
Hi Milind,
I bet someone else might have a faster answer. But could you provide the logs and config to get a better understanding of what your issue is?
In general, the state is maintained even in cases where a TaskManager fails.

Best,
Matthias

On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <[hidden email]> wrote:
Hi 

I see MemoryStateBackend being used in TM Log

org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)



I am logging checkpointed value which is just message count

Snapshot the state 500
Snapshot the state 1000


When I restart the job i.e. new TM but the job manager is same I see

Snapshot the state 500

In the JM logs I see following entries

Triggering checkpoint 1
Triggering checkpoint 2

After restarting job hence new TM

Triggering checkpoint 1

As per my understanding JM should hold the checkpointed state across TM ? Am I correct?

I have not configured anything special and using default. Do I need to add any setting to make it work ?
I want to maintain message count across the TMs.


--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
Reply | Threaded
Open this post in threaded view
|

Re: MemoryStateBackend Issue

Matthias
I'm not sure what you're trying to achieve. Are you trying to simulate a task failure? Or are you trying to pick up the state from a stopped job?
You could achieve the former one by killing the TaskManager instance or by throwing a custom failure as part of your job pipeline. The latter one can be achieved by using stop-with-savepoint instead of canceling the job.

Matthias

On Fri, Apr 23, 2021 at 9:31 PM Milind Vaidya <[hidden email]> wrote:
Hi Matthias,

Yeah you are right. I am canceling the job and hence it is creating new job with new job id and hence it is no respecting previous checkpoint. I observed same behaviour even for local FS backend.

Is there any way to simulated failing of job locally ?

As far as config is concerned, I have not configured any back end in the conf file and defaulting to Memory Checkpoint.

Thanks,
Milind 



On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl <[hidden email]> wrote:
One additional question: How did you stop and restart the job? The behavior you're expecting should work with stop-with-savepoint. Cancelling the job and then just restarting it wouldn't work. The latter approach would lead to a new job being created.

Best,
Matthias

On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <[hidden email]> wrote:
Hi Milind,
I bet someone else might have a faster answer. But could you provide the logs and config to get a better understanding of what your issue is?
In general, the state is maintained even in cases where a TaskManager fails.

Best,
Matthias

On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <[hidden email]> wrote:
Hi 

I see MemoryStateBackend being used in TM Log

org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)



I am logging checkpointed value which is just message count

Snapshot the state 500
Snapshot the state 1000


When I restart the job i.e. new TM but the job manager is same I see

Snapshot the state 500

In the JM logs I see following entries

Triggering checkpoint 1
Triggering checkpoint 2

After restarting job hence new TM

Triggering checkpoint 1

As per my understanding JM should hold the checkpointed state across TM ? Am I correct?

I have not configured anything special and using default. Do I need to add any setting to make it work ?
I want to maintain message count across the TMs.
Reply | Threaded
Open this post in threaded view
|

Re: MemoryStateBackend Issue

Milind Vaidya
Sounds good. 
How do I achieve stop with savepoint ?

- Milind 

On Mon, Apr 26, 2021 at 12:55 AM Matthias Pohl <[hidden email]> wrote:
I'm not sure what you're trying to achieve. Are you trying to simulate a task failure? Or are you trying to pick up the state from a stopped job?
You could achieve the former one by killing the TaskManager instance or by throwing a custom failure as part of your job pipeline. The latter one can be achieved by using stop-with-savepoint instead of canceling the job.

Matthias

On Fri, Apr 23, 2021 at 9:31 PM Milind Vaidya <[hidden email]> wrote:
Hi Matthias,

Yeah you are right. I am canceling the job and hence it is creating new job with new job id and hence it is no respecting previous checkpoint. I observed same behaviour even for local FS backend.

Is there any way to simulated failing of job locally ?

As far as config is concerned, I have not configured any back end in the conf file and defaulting to Memory Checkpoint.

Thanks,
Milind 



On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl <[hidden email]> wrote:
One additional question: How did you stop and restart the job? The behavior you're expecting should work with stop-with-savepoint. Cancelling the job and then just restarting it wouldn't work. The latter approach would lead to a new job being created.

Best,
Matthias

On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <[hidden email]> wrote:
Hi Milind,
I bet someone else might have a faster answer. But could you provide the logs and config to get a better understanding of what your issue is?
In general, the state is maintained even in cases where a TaskManager fails.

Best,
Matthias

On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <[hidden email]> wrote:
Hi 

I see MemoryStateBackend being used in TM Log

org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)



I am logging checkpointed value which is just message count

Snapshot the state 500
Snapshot the state 1000


When I restart the job i.e. new TM but the job manager is same I see

Snapshot the state 500

In the JM logs I see following entries

Triggering checkpoint 1
Triggering checkpoint 2

After restarting job hence new TM

Triggering checkpoint 1

As per my understanding JM should hold the checkpointed state across TM ? Am I correct?

I have not configured anything special and using default. Do I need to add any setting to make it work ?
I want to maintain message count across the TMs.
Reply | Threaded
Open this post in threaded view
|

Re: MemoryStateBackend Issue

Matthias
Hi Milind,
A job can be stopped with a savepoint in the following way [1]: ./bin/flink stop --savepointPath [:targetDirectory] :jobId

Best,
Matthias


On Sun, May 16, 2021 at 1:12 AM Milind Vaidya <[hidden email]> wrote:
Sounds good. 
How do I achieve stop with savepoint ?

- Milind 

On Mon, Apr 26, 2021 at 12:55 AM Matthias Pohl <[hidden email]> wrote:
I'm not sure what you're trying to achieve. Are you trying to simulate a task failure? Or are you trying to pick up the state from a stopped job?
You could achieve the former one by killing the TaskManager instance or by throwing a custom failure as part of your job pipeline. The latter one can be achieved by using stop-with-savepoint instead of canceling the job.

Matthias

On Fri, Apr 23, 2021 at 9:31 PM Milind Vaidya <[hidden email]> wrote:
Hi Matthias,

Yeah you are right. I am canceling the job and hence it is creating new job with new job id and hence it is no respecting previous checkpoint. I observed same behaviour even for local FS backend.

Is there any way to simulated failing of job locally ?

As far as config is concerned, I have not configured any back end in the conf file and defaulting to Memory Checkpoint.

Thanks,
Milind 



On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl <[hidden email]> wrote:
One additional question: How did you stop and restart the job? The behavior you're expecting should work with stop-with-savepoint. Cancelling the job and then just restarting it wouldn't work. The latter approach would lead to a new job being created.

Best,
Matthias

On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <[hidden email]> wrote:
Hi Milind,
I bet someone else might have a faster answer. But could you provide the logs and config to get a better understanding of what your issue is?
In general, the state is maintained even in cases where a TaskManager fails.

Best,
Matthias

On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <[hidden email]> wrote:
Hi 

I see MemoryStateBackend being used in TM Log

org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)



I am logging checkpointed value which is just message count

Snapshot the state 500
Snapshot the state 1000


When I restart the job i.e. new TM but the job manager is same I see

Snapshot the state 500

In the JM logs I see following entries

Triggering checkpoint 1
Triggering checkpoint 2

After restarting job hence new TM

Triggering checkpoint 1

As per my understanding JM should hold the checkpointed state across TM ? Am I correct?

I have not configured anything special and using default. Do I need to add any setting to make it work ?
I want to maintain message count across the TMs.
Reply | Threaded
Open this post in threaded view
|

Flink-pod-template-issue

Priyanka Manickam
In reply to this post by Milind Vaidya
Hi All,

Do we required to add any image for flink-main-container in pod-template.yaml file because it giving an error saying "spec.containers(0).image value required.


Could anyone help with this please

Thanks,
Priyanka Manickam

On Thu, 22 Apr 2021, 08:41 Milind Vaidya, <[hidden email]> wrote:
Hi 

I see MemoryStateBackend being used in TM Log

org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)



I am logging checkpointed value which is just message count

Snapshot the state 500
Snapshot the state 1000


When I restart the job i.e. new TM but the job manager is same I see

Snapshot the state 500

In the JM logs I see following entries

Triggering checkpoint 1
Triggering checkpoint 2

After restarting job hence new TM

Triggering checkpoint 1

As per my understanding JM should hold the checkpointed state across TM ? Am I correct?

I have not configured anything special and using default. Do I need to add any setting to make it work ?
I want to maintain message count across the TMs.





Reply | Threaded
Open this post in threaded view
|

Re: Flink-pod-template-issue

Priyanka Manickam
Hi All,

Do we required to add any image for flink-main-container in pod-template.yaml file because it giving an error saying "spec.containers(0).image value required.


Could anyone help with this please

Thanks,
Priyanka Manickam

.





Reply | Threaded
Open this post in threaded view
|

Re: Flink-pod-template-issue

ChangZhuo Chen (陳昌倬)
In reply to this post by Priyanka Manickam
On Mon, May 17, 2021 at 01:29:55PM +0530, Priyanka Manickam wrote:
> Hi All,
>
> Do we required to add any image for flink-main-container in
> pod-template.yaml file because it giving an error saying
> "spec.containers(0).image value required.
>
>
> Could anyone help with this please

Hi,

You need to specific your image via `kubernetes.container.image`
configuration as described in [0]. The image shall be Flink official one
+ your application jar so that you can specific your jar path as
local:///<location> when submitting to Kubernetes.

[0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink-pod-template-issue

Yang Wang
Could you share your pod-template.yaml or check whether the container name is configured to "flink-main-container"?

Best,
Yang

ChangZhuo Chen (陳昌倬) <[hidden email]> 于2021年5月17日周一 下午5:20写道:
On Mon, May 17, 2021 at 01:29:55PM +0530, Priyanka Manickam wrote:
> Hi All,
>
> Do we required to add any image for flink-main-container in
> pod-template.yaml file because it giving an error saying
> "spec.containers(0).image value required.
>
>
> Could anyone help with this please

Hi,

You need to specific your image via `kubernetes.container.image`
configuration as described in [0]. The image shall be Flink official one
+ your application jar so that you can specific your jar path as
local:///<location> when submitting to Kubernetes.

[0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B