Hi everyone,
I have a flink application having kafka sources which calculates some stats based on it and pushes it to JDBC. Now, I want to know till what timestamp is the data completely pushed in JDBC (i.e. no more data will be pushed to timestamp smaller or equal than this). There doesn't seem to be any direct programmatic way to do so. I came across the following thread which seemed most relevant to my problem: However, I can't seem to understand how to chain a process function before the sink task so as to put watermarks to a side output. (I suspect it might have something to do with datastream.addSink in regular datastream sinks vs sink.consumeDataStream(stream) in JDBCAppendTableSink). Further what happens if there are no windows, how to approach the problem then? Please share any pointers or relevant solution to tackle this. Thanks & Regards Shubham Kumar |
Hi Shubham,
you can call stream.process(...). The context of ProcessFunction gives you access to TimerService which let's you access the current watermark. I'm assuming your are using the Table API? As far as I remember, watermark are travelling through the stream even if there is no time-based operation happening. But you should double check that. However, a side output does not guarantee that the data has already been written out to the sink. So I would recommend to customize the JDBC sink instead and look into the row column for getting the current timestamp. Or even better, there should also be org.apache.flink.streaming.api.functions.sink.SinkFunction.Context with access to watermark. I hope this helps. Regards, Timo On 28.04.20 13:07, Shubham Kumar wrote: > Hi everyone, > > I have a flink application having kafka sources which calculates some > stats based on it and pushes it to JDBC. Now, I want to know till what > timestamp is the data completely pushed in JDBC (i.e. no more data will > be pushed to timestamp smaller or equal than this). There doesn't seem > to be any direct programmatic way to do so. > > I came across the following thread which seemed most relevant to my > problem: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461 > > However, I can't seem to understand how to chain a process function > before the sink task so as to put watermarks to a side output. (I > suspect it might have something to do with datastream.addSink in regular > datastream sinks vs sink.consumeDataStream(stream) in JDBCAppendTableSink). > > Further what happens if there are no windows, how to approach the > problem then? > > Please share any pointers or relevant solution to tackle this. > > -- > Thanks & Regards > > Shubham Kumar > |
Hi Timo, Yeah, I got the idea of getting access to timers through process function and had the same result which you explained that is a side output doesn't guarantee that the data is written out to sink. (so maybe Fabian in that post pointed out something else which I am missing). If I am correct then, data is written to side output as soon as it is processed in the Process function (maybe in process function itself on Ontimer call if a timer has been set, right? I am doing all computation in Datastream<Object> and then adding a mapper to convert to DataStream<Row> to sink through JdbcAppendTableSink which is part of Table API I think. I will definitely try exploring the Jdbc Sink function and context to get the watermark. Thinking out of the box, is it possible to add some extra operator after sink which will always have watermark which is greater than sink function watermarks, as its a downstream operator. Also, does the problem simplify if we have Kafka sink? On Tue, Apr 28, 2020 at 10:35 PM Timo Walther <[hidden email]> wrote: Hi Shubham, Thanks & Regards Shubham Kumar |
Following up on this, I tried tweaking the Jdbc Sink as Timo suggested and was successful in it. Basically I added a member long maxWatermarkSeen in JDBCOutputFormat, so whenever a new record is added to the batch it updates the maxWatermarkSeen for this subtask with org.apache.flink.streaming.api.functions.sink.SinkFunction.Context.watermark (if its greater). So whenever a JDBCOutputFormat.flush() is called I can be sure that after executing batch, all records having timestamp below maxWatermarkSeen are pushed to JDBC. Now, the actual answer I am looking for is minimum of maxWatermarkSeen for all subtasks. I can constantly update this to DB as <Subtask Index, Watermark> and take minimum in DB. I guess the aggregation can't be done inside flink amongst subtasks? Now, I have two questions: 1) Should I update this to DB using async I/O feature of flink or just perform a blocking query in JDBCOutputFormat.flush() function after executing the batch. 2) If I will be using Kafka sink (or any other sink for that matter), do I have to again tweak around with its SinkFunction for this functionality? General idea being that this a common functionality for users to know till what timestamp is sink complete and can have simpler solutions. Thanks Shubham On Wed, Apr 29, 2020 at 3:27 AM Shubham Kumar <[hidden email]> wrote:
Thanks & Regards Shubham Kumar |
Hi Shubham,
great that tweaking the JDBC sink helped. Maybe I don't fully understand your logic but: The watermark that you are receiving in an operator should already be the minimum of all subtasks. Because it is sent to all subsequent operators by the precedeing operator. So a watermark can trigger a flush. I guess you need a measure of completeness when the flush() has happended in all subtasks, right? Couldn't you wrap a sink function into a process function and use the output of the process function for performing the aggregation of watermark timestamps in Flink. A leaf operator (with parallelism 1) would then perform the aggregation and the update of the overall event-time timestamp. I hope this helps. Regards, Timo On 04.05.20 03:04, Shubham Kumar wrote: > Following up on this, > > I tried tweaking the Jdbc Sink as Timo suggested and was successful in > it. Basically I added a member /long maxWatermarkSeen /in JDBCOutputFormat, > so whenever a new record is added to the batch it updates the > /maxWatermarkSeen/ for this subtask with > /org.apache.flink.streaming.api.functions.sink.SinkFunction.Context.watermark/ > (if its greater). > So whenever a /JDBCOutputFormat.flush()/ is called I can be sure that > after executing batch, all records having timestamp below > /maxWatermarkSeen/ are pushed to JDBC. > > Now, the actual answer I am looking for is minimum of /maxWatermarkSeen/ > for all subtasks. I can constantly update this to DB as </Subtask Index, > Watermark/> and take minimum in DB. > I guess the aggregation can't be done inside flink amongst subtasks? > > Now, I have two questions: > > 1) Should I update this to DB using async I/O feature of flink or just > perform a blocking query in /JDBCOutputFormat.flush()/ function after > executing the batch. > 2) If I will be using Kafka sink (or any other sink for that matter), do > I have to again tweak around with its SinkFunction for this functionality? > General idea being that this a common functionality for users to > know till what timestamp is sink complete and can have simpler solutions. > > Thanks > Shubham > > On Wed, Apr 29, 2020 at 3:27 AM Shubham Kumar > <[hidden email] <mailto:[hidden email]>> wrote: > > Hi Timo, > > Yeah, I got the idea of getting access to timers through process > function and had the same result which you explained > that is a side output doesn't guarantee that the data is written out > to sink. (so maybe Fabian in that post pointed out something > else which I am missing). If I am correct then, data is written to > side output as soon as it is processed in the Process function > (maybe in > process function itself on Ontimer call if a timer has been set, right? > > I am doing all computation in Datastream<Object> and then adding a > mapper to convert to DataStream<Row> to sink through JdbcAppendTableSink > which is part of Table API I think. I will definitely try exploring > the Jdbc Sink function and context to get the watermark. > > Thinking out of the box, is it possible to add some extra operator > after sink which will always have watermark which is greater than > sink function watermarks, > as its a downstream operator. > Also, does the problem simplify if we have Kafka sink? > > On Tue, Apr 28, 2020 at 10:35 PM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Shubham, > > you can call stream.process(...). The context of ProcessFunction > gives > you access to TimerService which let's you access the current > watermark. > > I'm assuming your are using the Table API? As far as I remember, > watermark are travelling through the stream even if there is no > time-based operation happening. But you should double check that. > > However, a side output does not guarantee that the data has > already been > written out to the sink. So I would recommend to customize the > JDBC sink > instead and look into the row column for getting the current > timestamp. > > Or even better, there should also be > org.apache.flink.streaming.api.functions.sink.SinkFunction.Context > with > access to watermark. > > I hope this helps. > > Regards, > Timo > > On 28.04.20 13:07, Shubham Kumar wrote: > > Hi everyone, > > > > I have a flink application having kafka sources which > calculates some > > stats based on it and pushes it to JDBC. Now, I want to know > till what > > timestamp is the data completely pushed in JDBC (i.e. no more > data will > > be pushed to timestamp smaller or equal than this). There > doesn't seem > > to be any direct programmatic way to do so. > > > > I came across the following thread which seemed most relevant > to my > > problem: > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461 > > > > However, I can't seem to understand how to chain a process > function > > before the sink task so as to put watermarks to a side > output. (I > > suspect it might have something to do with datastream.addSink > in regular > > datastream sinks vs sink.consumeDataStream(stream) in > JDBCAppendTableSink). > > > > Further what happens if there are no windows, how to approach > the > > problem then? > > > > Please share any pointers or relevant solution to tackle this. > > > > -- > > Thanks & Regards > > > > Shubham Kumar > > > > > > -- > Thanks & Regards > > Shubham Kumar > > > > -- > Thanks & Regards > > Shubham Kumar > |
Free forum by Nabble | Edit this page |