Lost data when resuming from savepoint

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Lost data when resuming from savepoint

Jose Miguel Tejedor Fernandez
Hi,

I am running a simple stream Flink job (Flink version 1.3.2 and 1.3.1) whose source and sink is a Kafka cluster 0.10.0.1.

I am testing savepoints by stopping/resuming the job and when I checked the validity of the data sunk during the stop time I observed that some of the events have been lost. 

The stream of events is around 6K per 10 minutes and around 50% are lost. I share the code in case you can indicate me any hint.


Job is resumed correctly from last savepoint and checkpoints configuration is as follow:

env.setStateBackend(new FsStateBackend("s3://my_bucket/flink/checkpoints/"));
env.enableCheckpointing(params.getLong("checkpoint.interval", 300000));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


The Kafka consumer:

        SingleOutputStreamOperator<Map<String, String>> createKafkaStream(Collection<String> topics, int parallelism, Properties kafkaProps, StreamExecutionEnvironment env,
            int resetOffsetsTo, String eventTimeField) 
        {
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        FlinkKafkaConsumer010<Map<String, String>> consumer = new FlinkKafkaConsumer010<>(
                new LinkedList<>(topics),
                new EventMapSchema(), kafkaProps);

        DataStream<Map<String, String>> messageStream = env
                .addSource(consumer)
                .name("Kafka (" + StringUtils.join(topics, ", ") + ")")
                .setParallelism(parallelism);

        return messageStream
                // discard events that don't have the event time field
                .filter(new MissingTimeStampFilter(eventTimeField))
                // provide ascending timestamps for TimeCharacteristic.EventTime
                .assignTimestampsAndWatermarks(new EventMapTimestampExtractor(eventTimeField));

         }

        ....

        StreamTableUtils.registerTable("events", kafkaStream, fieldNames, tableEnv);

        String sql = "SELECT\n" +
                "  field_1 AS a,\n" +
                "  field_2 AS b,\n" +
                "  field_3 AS c,\n" +
                "  field_4 AS d,\n" +
                "  field_5 AS e,\n" +
                " FROM events\n" +
                " WHERE field_1 IS NOT NULL";
        LOG.info("sql: {}", sql);


        Table result = tableEnv.sql(sql);
        System.err.println("output fields: " + Arrays.toString(result.getSchema().getColumnNames()));

        if (printToErr) {≤
            printRows(result, tableEnv);
        }

        if (!StringUtils.isBlank(outputTopic)) {
            TableSink<?> tableSink = new Kafka09CustomJsonTableSink(outputTopic, KafkaStreams.getProperties(params),
                    new FlinkFixedPartitioner<>(), timestampColumn);
            result.writeToSink(tableSink);
        }

        env.execute();

Cheers 

BR


JM


Reply | Threaded
Open this post in threaded view
|

Re: Lost data when resuming from savepoint

Fabian Hueske-2
Hi Jose,

I had a look at your program but did not spot anything.
The query is a simple "SELECT FROM WHERE" query that does not have any state.
So the only state is the state of the Kafka source, i.e, the offset.

How much time did pass between taking the savepoint and resuming?
Did you see any exceptions in the log files (TM, JM)?

Thanks, Fabian

2017-10-06 15:48 GMT+02:00 Jose Miguel Tejedor Fernandez <[hidden email]>:
Hi,

I am running a simple stream Flink job (Flink version 1.3.2 and 1.3.1) whose source and sink is a Kafka cluster 0.10.0.1.

I am testing savepoints by stopping/resuming the job and when I checked the validity of the data sunk during the stop time I observed that some of the events have been lost. 

The stream of events is around 6K per 10 minutes and around 50% are lost. I share the code in case you can indicate me any hint.


Job is resumed correctly from last savepoint and checkpoints configuration is as follow:

env.setStateBackend(new FsStateBackend("s3://my_bucket/flink/checkpoints/"));
env.enableCheckpointing(params.getLong("checkpoint.interval", 300000));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


The Kafka consumer:

        SingleOutputStreamOperator<Map<String, String>> createKafkaStream(Collection<String> topics, int parallelism, Properties kafkaProps, StreamExecutionEnvironment env,
            int resetOffsetsTo, String eventTimeField) 
        {
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        FlinkKafkaConsumer010<Map<String, String>> consumer = new FlinkKafkaConsumer010<>(
                new LinkedList<>(topics),
                new EventMapSchema(), kafkaProps);

        DataStream<Map<String, String>> messageStream = env
                .addSource(consumer)
                .name("Kafka (" + StringUtils.join(topics, ", ") + ")")
                .setParallelism(parallelism);

        return messageStream
                // discard events that don't have the event time field
                .filter(new MissingTimeStampFilter(eventTimeField))
                // provide ascending timestamps for TimeCharacteristic.EventTime
                .assignTimestampsAndWatermarks(new EventMapTimestampExtractor(eventTimeField));

         }

        ....

        StreamTableUtils.registerTable("events", kafkaStream, fieldNames, tableEnv);

        String sql = "SELECT\n" +
                "  field_1 AS a,\n" +
                "  field_2 AS b,\n" +
                "  field_3 AS c,\n" +
                "  field_4 AS d,\n" +
                "  field_5 AS e,\n" +
                " FROM events\n" +
                " WHERE field_1 IS NOT NULL";
        LOG.info("sql: {}", sql);


        Table result = tableEnv.sql(sql);
        System.err.println("output fields: " + Arrays.toString(result.getSchema().getColumnNames()));

        if (printToErr) {≤
            printRows(result, tableEnv);
        }

        if (!StringUtils.isBlank(outputTopic)) {
            TableSink<?> tableSink = new Kafka09CustomJsonTableSink(outputTopic, KafkaStreams.getProperties(params),
                    new FlinkFixedPartitioner<>(), timestampColumn);
            result.writeToSink(tableSink);
        }

        env.execute();

Cheers 

BR


JM