count and window question with kafka

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

count and window question with kafka

robert
I have a process that will take 250,000 records from kafka and produce a file. (Using a CustomFileSync) 

Currently I just have the following:


DataStream<SchemaRecord> stream =
env.addSource(new FlinkKafkaConsumer010<SchemaRecord>("topic"", schema, properties)).setParallelism(40).flatMap(new SchemaRecordSplit()).setParallelism(40).name("Splitter").keyBy("partition", "randomkey", "schemaId");

stream.addSink(new CustomFileSystemSink()).setParallelism(40);


In my CustomFileSystemSink I have a for..next loop which closes the file off at 250K rows.


What I am looking to do is to close off the file every 5 min OR 250K rows...


As I read the window types is it possible to read from kafka and have the sink close every 5 min OR 250K rows ?

Hope this makes sense....




Reply | Threaded
Open this post in threaded view
|

Re: count and window question with kafka

Tony Wei
Hi,

I think ProcessFunction[1] is what you want. You can add it after keyBy and emit the result to sink after timeout or buffer filled.
The reference has a good example that show you how to use it.

Best Regards,
Tony Wei



2017-10-30 23:56 GMT+08:00 Telco Phone <[hidden email]>:
I have a process that will take 250,000 records from kafka and produce a file. (Using a CustomFileSync) 

Currently I just have the following:


DataStream<SchemaRecord> stream =
env.addSource(new FlinkKafkaConsumer010<SchemaRecord>("topic"", schema, properties)).setParallelism(40).flatMap(new SchemaRecordSplit()).setParallelism(40).name("Splitter").keyBy("partition", "randomkey", "schemaId");

stream.addSink(new CustomFileSystemSink()).setParallelism(40);


In my CustomFileSystemSink I have a for..next loop which closes the file off at 250K rows.


What I am looking to do is to close off the file every 5 min OR 250K rows...


As I read the window types is it possible to read from kafka and have the sink close every 5 min OR 250K rows ?

Hope this makes sense....