Is MapState tied to Operator Input & Output type?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Is MapState tied to Operator Input & Output type?

Arpith P

Hi,


I’ve a ProcessFunction which initially was receiving input & output type of String (1) & inside processElement I was updating MapState. Now I have changed the Input & Output type to be Map, String (2), but if I restore from the last checkpoint folder MapState is coming in as empty. I’ve checked that checkpoint folder actually saves data (i.e. Files size > 1GB). Does map state tied with ProcessFunction input & output type, if not why doesn't mapstate get restored.

 

(1)    

       public class TestProcess extends ProcessFunction<String, String> implements CheckpointedFunction

(2)    

    public class TestProcess extends ProcessFunction<Map<String, String>, String> implements CheckpointedFunction
Reply | Threaded
Open this post in threaded view
|

Re: Is MapState tied to Operator Input & Output type?

Yun Tang
Hi

The type of map state is not directly related with input & output type, this is only related with how you define the state descriptor.
  • Have you ever changed the state descriptor after changing the type of input/output type?
  • Have you assigned the id [1] to the operator which using the 'TestProcess'? The state might not be restored if you change your code without id assigned.


Best
Yun Tang


From: Arpith P <[hidden email]>
Sent: Tuesday, October 13, 2020 19:26
To: user <[hidden email]>
Subject: Is MapState tied to Operator Input & Output type?
 

Hi,


I’ve a ProcessFunction which initially was receiving input & output type of String (1) & inside processElement I was updating MapState. Now I have changed the Input & Output type to be Map, String (2), but if I restore from the last checkpoint folder MapState is coming in as empty. I’ve checked that checkpoint folder actually saves data (i.e. Files size > 1GB). Does map state tied with ProcessFunction input & output type, if not why doesn't mapstate get restored.

 

(1)    

       public class TestProcess extends ProcessFunction<String, String> implements CheckpointedFunction

(2)    

    public class TestProcess extends ProcessFunction<Map<String, String>, String> implements CheckpointedFunction
Reply | Threaded
Open this post in threaded view
|

Re: Is MapState tied to Operator Input & Output type?

Arpith P
Hi Yun,

Neither state descriptor type or name changed. I did assign an ID as well but it didn't help me. What I'm trying to do is I have two stream A & B which I want to connect/process in C; I eventually want values from stream A to be saved in C's MapState. What I've tried is I used ConnectedStream to connect both A(keyedstream) & B and processed in CoProcessFunction C, but looks like in CoprocessFunction doesn't have MapState as I'm getting functionInitializationContext.getKeyedStore as null. Is it possible to access MapState inside CoProcessFunction.

Arpith

On Tue, Oct 13, 2020 at 5:18 PM Yun Tang <[hidden email]> wrote:
Hi

The type of map state is not directly related with input & output type, this is only related with how you define the state descriptor.
  • Have you ever changed the state descriptor after changing the type of input/output type?
  • Have you assigned the id [1] to the operator which using the 'TestProcess'? The state might not be restored if you change your code without id assigned.


Best
Yun Tang


From: Arpith P <[hidden email]>
Sent: Tuesday, October 13, 2020 19:26
To: user <[hidden email]>
Subject: Is MapState tied to Operator Input & Output type?
 

Hi,


I’ve a ProcessFunction which initially was receiving input & output type of String (1) & inside processElement I was updating MapState. Now I have changed the Input & Output type to be Map, String (2), but if I restore from the last checkpoint folder MapState is coming in as empty. I’ve checked that checkpoint folder actually saves data (i.e. Files size > 1GB). Does map state tied with ProcessFunction input & output type, if not why doesn't mapstate get restored.

 

(1)    

       public class TestProcess extends ProcessFunction<String, String> implements CheckpointedFunction

(2)    

    public class TestProcess extends ProcessFunction<Map<String, String>, String> implements CheckpointedFunction
Reply | Threaded
Open this post in threaded view
|

Re: Is MapState tied to Operator Input & Output type?

Yun Tang
Hi Arpith

I'm afraid that you're totally talking about the wrong thing in previous thread. The root cause is not restoring state from checkpoint but not access the state legally.
Have you ever add keyBy before process your function as doc's note [1] said: "If you want to access keyed state and timers you have to apply the ProcessFunction on a keyed stream"?



Best
Yun Tang

From: Arpith P <[hidden email]>
Sent: Tuesday, October 13, 2020 22:20
To: Yun Tang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Is MapState tied to Operator Input & Output type?
 
Hi Yun,

Neither state descriptor type or name changed. I did assign an ID as well but it didn't help me. What I'm trying to do is I have two stream A & B which I want to connect/process in C; I eventually want values from stream A to be saved in C's MapState. What I've tried is I used ConnectedStream to connect both A(keyedstream) & B and processed in CoProcessFunction C, but looks like in CoprocessFunction doesn't have MapState as I'm getting functionInitializationContext.getKeyedStore as null. Is it possible to access MapState inside CoProcessFunction.

Arpith

On Tue, Oct 13, 2020 at 5:18 PM Yun Tang <[hidden email]> wrote:
Hi

The type of map state is not directly related with input & output type, this is only related with how you define the state descriptor.
  • Have you ever changed the state descriptor after changing the type of input/output type?
  • Have you assigned the id [1] to the operator which using the 'TestProcess'? The state might not be restored if you change your code without id assigned.


Best
Yun Tang


From: Arpith P <[hidden email]>
Sent: Tuesday, October 13, 2020 19:26
To: user <[hidden email]>
Subject: Is MapState tied to Operator Input & Output type?
 

Hi,


I’ve a ProcessFunction which initially was receiving input & output type of String (1) & inside processElement I was updating MapState. Now I have changed the Input & Output type to be Map, String (2), but if I restore from the last checkpoint folder MapState is coming in as empty. I’ve checked that checkpoint folder actually saves data (i.e. Files size > 1GB). Does map state tied with ProcessFunction input & output type, if not why doesn't mapstate get restored.

 

(1)    

       public class TestProcess extends ProcessFunction<String, String> implements CheckpointedFunction

(2)    

    public class TestProcess extends ProcessFunction<Map<String, String>, String> implements CheckpointedFunction