Please help, I need to bootstrap keyed state into a stream

Please help, I need to bootstrap keyed state into a stream

Marco Villalobos-2
According to the documentation, and various blogs, it is possible to use the Batch Execution Environment to bootstrap state into a save point, and then load that state in a Stream Execution Environment.

I am trying to use that feature.

State Processor API documentation states that "you can read a batch of data from any store, preprocess it, and write the result to a savepoint that you use to bootstrap the state of a streaming application."


Kartik Khare in his blog wrote that "You can create both Batch and Stream environment in a single job."


However, I have yet to find a working example that shows how to do both.

I am reaching out to the community to help solve and document this common problem.

I open to contributions.

Please help me solve this, my work and livelihood depend on me solving this.

I keep on getting a NullPointerException deep within the Flink API that makes no sense to me:

Caused by: java.lang.NullPointerException
at org.apache.flink.runtime.state.heap.StateTable.put(
at org.apache.flink.runtime.state.heap.StateTable.put(
at org.apache.flink.runtime.state.heap.HeapMapState.put(
at org.apache.flink.runtime.state.UserFacingMapState.put(

Can somebody please explain what I am doing wrong?

I have my code below just for convenience:


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.concurrent.Callable;

@CommandLine.Command(name = "Boot Strap Keyed State into Stream", mixinStandardHelpOptions = true,
        description = "This demo will attempt to boot strap a dataset into a save point that will be read by a stream.")
public class BootstrapKeyedStateIntoStreamApp implements Callable<Integer> {

    private final static Logger logger = LoggerFactory.getLogger(BootstrapKeyedStateIntoStreamApp.class);

    @CommandLine.Option(names = {"-s", "--save-point-path"}, description = "The save point path.", required = true)
    private transient File savePointPath;

    public Integer call() throws Exception {
        return 0;

    //writes dataset into a savepoint
    public void bootstrap() throws Exception {"Starting boot strap demo with save-point-path: file://{}", savePointPath);

        final ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        final JDBCInputFormat bootstrapJdbcInputFormat = buildBootStrapJdbcInputFormat("evil-inc");
        final DataSet<Tuple3<String, String, String>> bootStrapDataSet =
                batchEnv.createInput(bootstrapJdbcInputFormat).name("bootstrap data-source")
                        .map(new MapFunction<Row, Tuple3<String, String, String>>() {
                            public Tuple3<String, String, String> map(Row row) throws Exception {
                                return new Tuple3<String, String, String>(
                                        (String) row.getField(0),
                                        (String) row.getField(1),
                                        (String) row.getField(2)
                        }).returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING))
                .name("bootstrap dataset");

        final BootstrapTransformation<Tuple3<String, String, String>> bootstrapTransformation = OperatorTransformation
                .transform(new ConfigurationKeyedStateBootstrapFunction());

        Savepoint.create(new MemoryStateBackend(), 2)
                .withOperator("boot-strap", bootstrapTransformation)
                .write("file://" + savePointPath.getPath());
        batchEnv.execute("bootstrap demo");

    //TODO while processing a stream this will use the keyed state written by bootstrap
    public void stream() {


    private JDBCInputFormat buildBootStrapJdbcInputFormat(String namespace) {
        Serializable[][] queryParameters = new Serializable[1][1];
        queryParameters[0] = new Serializable[]{namespace};

        return JDBCInputFormat.buildJDBCInputFormat()
                .setQuery("SELECT namespace, name, value FROM configurations WHERE namespace = ?")
                .setRowTypeInfo(new RowTypeInfo(
                        BasicTypeInfo.STRING_TYPE_INFO, // namespace
                        BasicTypeInfo.STRING_TYPE_INFO, // name
                        BasicTypeInfo.STRING_TYPE_INFO  // value
                .setParametersProvider(new GenericParameterValuesProvider(queryParameters))

    public static void main(String[] args) throws Exception {

        final String databaseURL = "jdbc:derby:memory:flink;create=true";
        int exitCode;
        try (final Connection con = DriverManager.getConnection(databaseURL)) {
            try (final Statement stmt = con.createStatement();) {
                stmt.execute("CREATE TABLE configurations (\n" +
                        "    namespace VARCHAR(50) NOT NULL,\n" +
                        "    name VARCHAR(50) NOT NULL,\n" +
                        "    value VARCHAR(50) NOT NULL,\n" +
                        "    version INTEGER NOT NULL DEFAULT 1,\n" +
                        "    create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
                        "    modify_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
                        "    UNIQUE (name)\n" +

            try (final PreparedStatement stmt = con.prepareStatement("INSERT INTO configurations (namespace, name, value) VALUES (?, ?, ?)")) {
                for (int i = 0; i < 10; i++) {
                    stmt.setString(1, "evil-inc");
                    stmt.setString(2, "akb-" + i);
                    stmt.setString(3, "" + i);

            exitCode = new CommandLine(new BootstrapKeyedStateIntoStreamApp()).execute(args);

            if (exitCode == 0) {
                try (final Statement stmt = con.createStatement()) {
                    final ResultSet rs = stmt.executeQuery("SELECT id, namespace, name, value, version, create_time, modify_time FROM configurations ORDER BY name");
                    while ( {
                        final long id = rs.getLong(1);
                        final String namespace = rs.getString(2);
                        final String name = rs.getString(3);
                        final String value = rs.getString(4);
                        final int version = rs.getInt(5);
                        final Timestamp create_time = rs.getTimestamp(6);
                        final Timestamp modify_time = rs.getTimestamp(7);
                                "id: {}, namespace: \"{}\", name: \"{}\", value: {}, version: {} create_time: \"{}\" modify_time: \"{}\"",
                                id, namespace, name, value, version, create_time, modify_time


    public static class ConfigurationKeyedStateBootstrapFunction extends KeyedStateBootstrapFunction<Tuple, Tuple3<String, String, String>> {

        private transient MapState<String, Tuple3<String, String, String>> lastValues;

        public void open(Configuration parameters) throws Exception {
            MapStateDescriptor<String, Tuple3<String, String, String>> descriptor =
                    new MapStateDescriptor<String, Tuple3<String, String, String>>(
                            TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {

            lastValues = getRuntimeContext().getMapState(descriptor);

        public void processElement(Tuple3<String, String, String> value, Context ctx) throws Exception {
  "Adding value to map state: {}", value);
            lastValues.put(value.f1, value);
Re: Please help, I need to bootstrap keyed state into a stream

Tzu-Li Tai

For the NullPointerException, what seems to be happening is that you are
setting NULL values in your MapState, that is not allowed by the API.

Otherwise, the code that you showed for bootstrapping state seems to be

> I have yet to find a working example that shows how to do both
> (bootstrapping state and start a streaming application with that state)

Not entirely sure what you mean here by "doing both".
The savepoint written using the State Processor API (what you are doing in
the bootstrap() method) is a savepoint that may be restored from as you
would with a typical Flink streaming job restore.
So, usually the bootstrapping part happens as a batch "offline" job, while
you keep your streaming job as a separate job. What are you trying to
achieve with having both written within the same job?


Re: Please help, I need to bootstrap keyed state into a stream

I recently was in the same situation as Marco, the docs do explain what you need to do, but without experience with Flink it might still not be obvious what you need to do.

What I did initially:

Setup the job to run in a 'write a save state' mode by implementing a command line switch I could use when running the job:

flink run somejob.jar -d /some/path

The code then when run with this switch ran *only* the required code to setup a version of state and write that to a savestate.

This worked and I was on my way.

However, I then decided to split this out into a new flink 'jar' with the sole purpose of creating a save state.  This is a cleaner approach in my case and also removes dependancies (my state was loaded from DynamoDB) that were only required in this one instance.

As rebuilding the state from this application is intended to only be done the once, with checkpoints/savestates the main approach going forward.

Just remember to name your Operators with the same ID/name to make sure it is compatible.

On Monday, 10 August 2020 07:27, Tzu-Li Tai <[hidden email]> wrote:

Re: Please help, I need to bootstrap keyed state into a stream

Marco Villalobos-2
In reply to this post by Tzu-Li Tai
First, thank you.

 I want to believe you, I don't see how that is possible.

All of the code is self-contained, and at the bottom of all the code, I print out the non-null values before I attempt to put in the map state.

All of the debug output before and after indicates that there is a null value in there.

I think something else is wrong.  I am going to run this in a distributed environment and see what happens.

Again, thank you.

Marco A. Villalobos
Re: Please help, I need to bootstrap keyed state into a stream

Marco Villalobos-2
In reply to this post by orionemail
Thank you. Your instruction was helpful in my solving this.

You can read about my solution at

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
Re: Please help, I need to bootstrap keyed state into a stream

Marco Villalobos-2
In reply to this post by Tzu-Li Tai
I think there is a bug in Flink when running locally without a cluster.

My code worked in a cluster, but failed when run locally.

My code does not save null values in Map State.

Re: Please help, I need to bootstrap keyed state into a stream

Seth Wiesman
Just to summarize the conversation so far:

The state processor api reads data from a 3rd party system - such as JDBC in this example - and generates a savepoint file that is written out to some DFS.  This savepoint can then be used to when starting a flink streaming application. It is a two-step process, creating the savepoint in one job and then starting a streaming application from that savepoint in another.

These jobs do not have to be a single application, and in general, I recommend they be developed as two separate jobs. The reason being, bootstrapping state is a one-time process while your streaming application runs forever. It will simplify your development and operations in the long term if you do not mix concerns.

Concerning the NullPointerException:

The max parallelism must be at least 128. I've opened a ticket to track and resolve this issue.


Re: Please help, I need to bootstrap keyed state into a stream

Marco Villalobos-2
Hi Seth,

Thank you for the advice. The solution you mentioned is exactly what I did.

I wrote a small tutorial that explains how to repeat that pattern.

Regarding the NullPointerException when running locally, thank you for filing a ticket. It would be very nice to get that fixed.


Marco A. Villalobos

