Can't create a savepoint with State Processor API

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Can't create a savepoint with State Processor API

Dmitry Minaev
Hi everyone,

I'm looking for a way to modify state inside an operator in Flink. I found
State Processor API
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html#modifying-savepoints>  
that allows to modify savepoints, which looks great. But I can't make it
work.

I can read an existing state from savepoint but if I try to create (or
modify) a savepoint it doesn't write it by some reason.

Questions:
1. Is State Processor API the right way to achieve what I'm looking for? Are
there any other approaches?
2. can I ran this as a standalone java program or it has to be a part of a
Flink job?
3. I expect to have a new savepoint in the provided location after running
the code below, is that the right expectation?
```
public static void main( String[] args ) throws Exception
{
        ExecutionEnvironment bEnv =
ExecutionEnvironment.getExecutionEnvironment();

        BootstrapTransformation<Integer> transform =
OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3))
                .keyBy(String::valueOf)
                .transform(new SimplestTransform());

        Savepoint.create(new MemoryStateBackend(),
16).withOperator("my-operator-uid",
transform).write("file:///tmp/savepoints/");
}

public class SimplestTransform extends KeyedStateBootstrapFunction<String,
Integer>
{
    ValueState<Integer> state;

    @Override
    public void open( Configuration parameters) {
        ValueStateDescriptor<Integer> descriptor = new
ValueStateDescriptor<>("total", Types.INT);
        state = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(Integer value, Context ctx) throws Exception
{
        state.update(value);
    }
}
```

It finishes successfully but it doesn't write anything to the specified
folder. I tried folder format with "file://" prefix and without it.

I feel I'm missing something.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Can't create a savepoint with State Processor API

David Anderson-2
You are very close. I got your example to work by switching from the MemoryStateBackend to the FsStateBackend, and adding 

bEnv.execute();

at the end of main().

I'm not sure why either of those might be necessary, but it doesn't seem to work without both changes.

See https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf for my version.

David Anderson | Training Coordinator

Follow us @VervericaData

--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


On Thu, Mar 19, 2020 at 2:54 AM Dmitry Minaev <[hidden email]> wrote:
Hi everyone,

I'm looking for a way to modify state inside an operator in Flink. I found
State Processor API
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html#modifying-savepoints
that allows to modify savepoints, which looks great. But I can't make it
work.

I can read an existing state from savepoint but if I try to create (or
modify) a savepoint it doesn't write it by some reason.

Questions:
1. Is State Processor API the right way to achieve what I'm looking for? Are
there any other approaches?
2. can I ran this as a standalone java program or it has to be a part of a
Flink job?
3. I expect to have a new savepoint in the provided location after running
the code below, is that the right expectation?
```
public static void main( String[] args ) throws Exception
{
        ExecutionEnvironment bEnv =
ExecutionEnvironment.getExecutionEnvironment();

        BootstrapTransformation<Integer> transform =
OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3))
                .keyBy(String::valueOf)
                .transform(new SimplestTransform());

        Savepoint.create(new MemoryStateBackend(),
16).withOperator("my-operator-uid",
transform).write("file:///tmp/savepoints/");
}

public class SimplestTransform extends KeyedStateBootstrapFunction<String,
Integer>
{
    ValueState<Integer> state;

    @Override
    public void open( Configuration parameters) {
        ValueStateDescriptor<Integer> descriptor = new
ValueStateDescriptor<>("total", Types.INT);
        state = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(Integer value, Context ctx) throws Exception
{
        state.update(value);
    }
}
```

It finishes successfully but it doesn't write anything to the specified
folder. I tried folder format with "file://" prefix and without it.

I feel I'm missing something.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Can't create a savepoint with State Processor API

Dmitry Minaev
Yep, that works! Many thanks David, really appreciate it!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/