How many task managers to launch for a job?

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re:Unable to restore state value after job failed using RocksDBStateBackend

Shu Su

Hi wanglei

 Can you post how you restart the job ? 

Thanks,
Simon
On 06/25/2019 20:11[hidden email] wrote:
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {

private transient ValueState<Tuple2<Long,Long>> state;
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {


Tuple2<Long, Long> stateValue = state.value();

if(stateValue == null){
log.info("########## initialize");
stateValue = new Tuple2(34l,56l);
}
state.update(stateValue);

}

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
new TypeHint<Tuple2<Long, Long>>() {}));
state = getRuntimeContext().getState(descriptor);
}
}



Every time I restarted the job,   The stateValue is still null.



 
Reply | Threaded
Open this post in threaded view
|

Re: Unable to restore state value after job failed using RocksDBStateBackend

Stephan Ewen
If you manually cancel and restart the job, state is only carried forward if you use a savepoint.
Can you check if that is what you are doing?

On Tue, Jun 25, 2019 at 2:21 PM Simon Su <[hidden email]> wrote:

Hi wanglei

 Can you post how you restart the job ? 

Thanks,
Simon
On 06/25/2019 20:11[hidden email] wrote:
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {

private transient ValueState<Tuple2<Long,Long>> state;
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {


Tuple2<Long, Long> stateValue = state.value();

if(stateValue == null){
log.info("########## initialize");
stateValue = new Tuple2(34l,56l);
}
state.update(stateValue);

}

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
new TypeHint<Tuple2<Long, Long>>() {}));
state = getRuntimeContext().getState(descriptor);
}
}



Every time I restarted the job,   The stateValue is still null.



 
Reply | Threaded
Open this post in threaded view
|

Re: Re: Unable to restore state value after job failed using RocksDBStateBackend

wanglei2@geekplus.com.cn

I  start and cancel it just in my intellij idea development environment.
    
First click the run button, then click the red stop button, and then click the run button again. 

Let me google about the savepoint.

Thanks,
Lei Wang



 
Date: 2019-06-25 20:36
Subject: Re: Unable to restore state value after job failed using RocksDBStateBackend
If you manually cancel and restart the job, state is only carried forward if you use a savepoint.
Can you check if that is what you are doing?

On Tue, Jun 25, 2019 at 2:21 PM Simon Su <[hidden email]> wrote:

Hi wanglei

 Can you post how you restart the job ? 

Thanks,
Simon
On 06/25/2019 20:11[hidden email] wrote:
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {

private transient ValueState<Tuple2<Long,Long>> state;
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {


Tuple2<Long, Long> stateValue = state.value();

if(stateValue == null){
log.info("########## initialize");
stateValue = new Tuple2(34l,56l);
}
state.update(stateValue);

}

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
new TypeHint<Tuple2<Long, Long>>() {}));
state = getRuntimeContext().getState(descriptor);
}
}



Every time I restarted the job,   The stateValue is still null.



 
Reply | Threaded
Open this post in threaded view
|

Re: Unable to restore state value after job failed using RocksDBStateBackend

Shu Su
Hi Lei Wang

Actually it will not work, job recovery from checkpoint by using jobid to detect the snapshot directory, but when restart it in intellj and didn’t set any configurations, jobid will regenerate and it will regard as a new job, so you get the null state every time, you can follow by https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html

Thanks,
Simon

On 06/25/2019 21:43[hidden email] wrote:

I  start and cancel it just in my intellij idea development environment.
    
First click the run button, then click the red stop button, and then click the run button again. 

Let me google about the savepoint.

Thanks,
Lei Wang



 
Date: 2019-06-25 20:36
Subject: Re: Unable to restore state value after job failed using RocksDBStateBackend
If you manually cancel and restart the job, state is only carried forward if you use a savepoint.
Can you check if that is what you are doing?

On Tue, Jun 25, 2019 at 2:21 PM Simon Su <[hidden email]> wrote:

Hi wanglei

 Can you post how you restart the job ? 

Thanks,
Simon
On 06/25/2019 20:11[hidden email] wrote:
public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {

private transient ValueState<Tuple2<Long,Long>> state;
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {


Tuple2<Long, Long> stateValue = state.value();

if(stateValue == null){
log.info("########## initialize");
stateValue = new Tuple2(34l,56l);
}
state.update(stateValue);

}

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
new TypeHint<Tuple2<Long, Long>>() {}));
state = getRuntimeContext().getState(descriptor);
}
}



Every time I restarted the job,   The stateValue is still null.



 
12