I have a process that will take 250,000 records from kafka and produce a file. (Using a CustomFileSync)
Currently I just have the following:
DataStream<SchemaRecord> stream =
env.addSource(new FlinkKafkaConsumer010<SchemaRecord>("topic"", schema, properties)).setParallelism(40).flatMap(new SchemaRecordSplit()).setParallelism(40).name("Splitter").keyBy("partition", "randomkey", "schemaId");
stream.addSink(new CustomFileSystemSink()).setParallelism(40);
In my CustomFileSystemSink I have a for..next loop which closes the file off at 250K rows.
What I am looking to do is to close off the file every 5 min OR 250K rows...
As I read the window types is it possible to read from kafka and have the sink close every 5 min OR 250K rows ?
Hope this makes sense....