Hi,
I am running a simple stream Flink job (Flink version 1.3.2 and 1.3.1) whose source and sink is a Kafka cluster 0.10.0.1.
I am testing savepoints by stopping/resuming the job and when I checked the validity of the data sunk during the stop time I observed that some of the events have been lost.
The stream of events is around 6K per 10 minutes and around 50% are lost. I share the code in case you can indicate me any hint.
Job is resumed correctly from last savepoint and checkpoints configuration is as follow:
The Kafka consumer:
SingleOutputStreamOperator<Map<String, String>> createKafkaStream(Collection<String> topics, int parallelism, Properties kafkaProps, StreamExecutionEnvironment env,
int resetOffsetsTo, String eventTimeField)
{
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<Map<String, String>> consumer = new FlinkKafkaConsumer010<>(
new LinkedList<>(topics),
new EventMapSchema(), kafkaProps);
DataStream<Map<String, String>> messageStream = env
.addSource(consumer)
.name("Kafka (" + StringUtils.join(topics, ", ") + ")")
.setParallelism(parallelism);
return messageStream
// discard events that don't have the event time field
.filter(new MissingTimeStampFilter(eventTimeField))
// provide ascending timestamps for TimeCharacteristic.EventTime
.assignTimestampsAndWatermarks(new EventMapTimestampExtractor(eventTimeField));
}
....
StreamTableUtils.registerTable("events", kafkaStream, fieldNames, tableEnv);
String sql = "SELECT\n" +
" field_1 AS a,\n" +
" field_2 AS b,\n" +
" field_3 AS c,\n" +
" field_4 AS d,\n" +
" field_5 AS e,\n" +
" FROM events\n" +
" WHERE field_1 IS NOT NULL";
LOG.info("sql: {}", sql);
Table result = tableEnv.sql(sql);
System.err.println("output fields: " + Arrays.toString(result.getSchema().getColumnNames()));
if (printToErr) {≤
printRows(result, tableEnv);
}
if (!StringUtils.isBlank(outputTopic)) {
TableSink<?> tableSink = new Kafka09CustomJsonTableSink(outputTopic, KafkaStreams.getProperties(params),
new FlinkFixedPartitioner<>(), timestampColumn);
result.writeToSink(tableSink);
}
env.execute();
Cheers
BR