Hi, I am running a simple stream Flink job (Flink version 1.3.2 and 1.3.1) whose source and sink is a Kafka cluster 0.10.0.1. I am testing savepoints by stopping/resuming the job and when I checked the validity of the data sunk during the stop time I observed that some of the events have been lost. The stream of events is around 6K per 10 minutes and around 50% are lost. I share the code in case you can indicate me any hint. Job is resumed correctly from last savepoint and checkpoints configuration is as follow:
The Kafka consumer: SingleOutputStreamOperator<Map<String, String>> createKafkaStream(Collection<String> topics, int parallelism, Properties kafkaProps, StreamExecutionEnvironment env, int resetOffsetsTo, String eventTimeField) { env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); FlinkKafkaConsumer010<Map<String, String>> consumer = new FlinkKafkaConsumer010<>( new LinkedList<>(topics), new EventMapSchema(), kafkaProps); DataStream<Map<String, String>> messageStream = env .addSource(consumer) .name("Kafka (" + StringUtils.join(topics, ", ") + ")") .setParallelism(parallelism); return messageStream // discard events that don't have the event time field .filter(new MissingTimeStampFilter(eventTimeField)) // provide ascending timestamps for TimeCharacteristic.EventTime .assignTimestampsAndWatermarks(new EventMapTimestampExtractor(eventTimeField)); } .... StreamTableUtils.registerTable("events", kafkaStream, fieldNames, tableEnv); String sql = "SELECT\n" + " field_1 AS a,\n" + " field_2 AS b,\n" + " field_3 AS c,\n" + " field_4 AS d,\n" + " field_5 AS e,\n" + " FROM events\n" + " WHERE field_1 IS NOT NULL"; LOG.info("sql: {}", sql); Table result = tableEnv.sql(sql); System.err.println("output fields: " + Arrays.toString(result.getSchema().getColumnNames())); if (printToErr) {≤ printRows(result, tableEnv); } if (!StringUtils.isBlank(outputTopic)) { TableSink<?> tableSink = new Kafka09CustomJsonTableSink(outputTopic, KafkaStreams.getProperties(params), new FlinkFixedPartitioner<>(), timestampColumn); result.writeToSink(tableSink); } env.execute(); Cheers BR JM |
Hi Jose, I had a look at your program but did not spot anything.2017-10-06 15:48 GMT+02:00 Jose Miguel Tejedor Fernandez <[hidden email]>:
|
Free forum by Nabble | Edit this page |