Lost data when resuming from savepoint

Posted by Jose Miguel Tejedor Fernandez on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Lost-data-when-resuming-from-savepoint-tp16063.html

Hi,

I am running a simple stream Flink job (Flink version 1.3.2 and 1.3.1) whose source and sink is a Kafka cluster 0.10.0.1.

I am testing savepoints by stopping/resuming the job and when I checked the validity of the data sunk during the stop time I observed that some of the events have been lost. 

The stream of events is around 6K per 10 minutes and around 50% are lost. I share the code in case you can indicate me any hint.


Job is resumed correctly from last savepoint and checkpoints configuration is as follow:

env.setStateBackend(new FsStateBackend("s3://my_bucket/flink/checkpoints/"));
env.enableCheckpointing(params.getLong("checkpoint.interval", 300000));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


The Kafka consumer:

        SingleOutputStreamOperator<Map<String, String>> createKafkaStream(Collection<String> topics, int parallelism, Properties kafkaProps, StreamExecutionEnvironment env,
            int resetOffsetsTo, String eventTimeField) 
        {
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        FlinkKafkaConsumer010<Map<String, String>> consumer = new FlinkKafkaConsumer010<>(
                new LinkedList<>(topics),
                new EventMapSchema(), kafkaProps);

        DataStream<Map<String, String>> messageStream = env
                .addSource(consumer)
                .name("Kafka (" + StringUtils.join(topics, ", ") + ")")
                .setParallelism(parallelism);

        return messageStream
                // discard events that don't have the event time field
                .filter(new MissingTimeStampFilter(eventTimeField))
                // provide ascending timestamps for TimeCharacteristic.EventTime
                .assignTimestampsAndWatermarks(new EventMapTimestampExtractor(eventTimeField));

         }

        ....

        StreamTableUtils.registerTable("events", kafkaStream, fieldNames, tableEnv);

        String sql = "SELECT\n" +
                "  field_1 AS a,\n" +
                "  field_2 AS b,\n" +
                "  field_3 AS c,\n" +
                "  field_4 AS d,\n" +
                "  field_5 AS e,\n" +
                " FROM events\n" +
                " WHERE field_1 IS NOT NULL";
        LOG.info("sql: {}", sql);


        Table result = tableEnv.sql(sql);
        System.err.println("output fields: " + Arrays.toString(result.getSchema().getColumnNames()));

        if (printToErr) {≤
            printRows(result, tableEnv);
        }

        if (!StringUtils.isBlank(outputTopic)) {
            TableSink<?> tableSink = new Kafka09CustomJsonTableSink(outputTopic, KafkaStreams.getProperties(params),
                    new FlinkFixedPartitioner<>(), timestampColumn);
            result.writeToSink(tableSink);
        }

        env.execute();

Cheers 

BR


JM