Hi,I am running a simple stream Flink job (Flink version 1.3.2 and 1.3.1) whose source and sink is a Kafka cluster 0.10.0.1.I am testing savepoints by stopping/resuming the job and when I checked the validity of the data sunk during the stop time I observed that some of the events have been lost.The stream of events is around 6K per 10 minutes and around 50% are lost. I share the code in case you can indicate me any hint.Job is resumed correctly from last savepoint and checkpoints configuration is as follow:env.setStateBackend(new FsStateBackend("s3://my_bucket/flink/checkpoints/")); env.enableCheckpointing(params.getLong("checkpoint. interval", 300000)); env.getCheckpointConfig().setMinPauseBetweenCheckpoints( params.getLong("checkpoint. minPause", 60 * 1000)); env.getCheckpointConfig().setMaxConcurrentCheckpoints( params.getInt("checkpoint. maxConcurrent", 1)); env.getCheckpointConfig().setCheckpointTimeout(params. getLong("checkpoint.timeout", 10 * 60 * 1000)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig. ExternalizedCheckpointCleanup. RETAIN_ON_CANCELLATION); The Kafka consumer:SingleOutputStreamOperator<Map<String, String>> createKafkaStream(Collection< String> topics, int parallelism, Properties kafkaProps, StreamExecutionEnvironment env, int resetOffsetsTo, String eventTimeField){env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime); FlinkKafkaConsumer010<Map<String, String>> consumer = new FlinkKafkaConsumer010<>( new LinkedList<>(topics),new EventMapSchema(), kafkaProps);DataStream<Map<String, String>> messageStream = env.addSource(consumer).name("Kafka (" + StringUtils.join(topics, ", ") + ")").setParallelism(parallelism);return messageStream// discard events that don't have the event time field.filter(new MissingTimeStampFilter(eventTimeField)) // provide ascending timestamps for TimeCharacteristic.EventTime.assignTimestampsAndWatermarks( new EventMapTimestampExtractor( eventTimeField)); }....StreamTableUtils.registerTable("events", kafkaStream, fieldNames, tableEnv); String sql = "SELECT\n" +" field_1 AS a,\n" +" field_2 AS b,\n" +" field_3 AS c,\n" +" field_4 AS d,\n" +" field_5 AS e,\n" +" FROM events\n" +" WHERE field_1 IS NOT NULL";LOG.info("sql: {}", sql);Table result = tableEnv.sql(sql);System.err.println("output fields: " + Arrays.toString(result.getSchema().getColumnNames())) ; if (printToErr) {≤printRows(result, tableEnv);}if (!StringUtils.isBlank(outputTopic)) { TableSink<?> tableSink = new Kafka09CustomJsonTableSink(outputTopic, KafkaStreams.getProperties( params), new FlinkFixedPartitioner<>(), timestampColumn);result.writeToSink(tableSink);}env.execute();CheersBRJM
Free forum by Nabble | Edit this page |