Re: Publishing Sink Task watermarks outside flink

Posted by Timo Walther on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Publishing-Sink-Task-watermarks-outside-flink-tp34765p35219.html

Hi Shubham,

great that tweaking the JDBC sink helped. Maybe I don't fully understand
your logic but:

The watermark that you are receiving in an operator should already be
the minimum of all subtasks. Because it is sent to all subsequent
operators by the precedeing operator. So a watermark can trigger a
flush. I guess you need a measure of completeness when the flush() has
happended in all subtasks, right?

Couldn't you wrap a sink function into a process function and use the
output of the process function for performing the aggregation of
watermark timestamps in Flink. A leaf operator (with parallelism 1)
would then perform the aggregation and the update of the overall
event-time timestamp.

I hope this helps.

Regards,
Timo


On 04.05.20 03:04, Shubham Kumar wrote:

> Following up on this,
>
> I tried tweaking the Jdbc Sink as Timo suggested and was successful in
> it. Basically I added a member /long maxWatermarkSeen /in JDBCOutputFormat,
> so whenever a new record is added to the batch it updates the
> /maxWatermarkSeen/ for this subtask with
> /org.apache.flink.streaming.api.functions.sink.SinkFunction.Context.watermark/
> (if its greater).
>   So whenever a /JDBCOutputFormat.flush()/ is called I can be sure that
> after executing batch, all records having timestamp below
> /maxWatermarkSeen/ are pushed to JDBC.
>
> Now, the actual answer I am looking for is minimum of /maxWatermarkSeen/
> for all subtasks. I can constantly update this to DB as </Subtask Index,
> Watermark/> and take minimum in DB.
>   I guess the aggregation can't be done inside flink amongst subtasks?
>
> Now, I have two questions:
>
> 1) Should I update this to DB using async I/O feature of flink or just
> perform a blocking query in /JDBCOutputFormat.flush()/ function after
> executing the batch.
> 2) If I will be using Kafka sink (or any other sink for that matter), do
> I have to again tweak around with its SinkFunction for this functionality?
>     General idea being that this a common functionality for users to
> know till what timestamp is sink complete and can have simpler solutions.
>
> Thanks
> Shubham
>
> On Wed, Apr 29, 2020 at 3:27 AM Shubham Kumar
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi Timo,
>
>     Yeah, I got the idea of getting access to timers through process
>     function and had the same result which you explained
>     that is a side output doesn't guarantee that the data is written out
>     to sink. (so maybe Fabian in that post pointed out something
>     else which I am missing). If I am correct then, data is written to
>     side output as soon as it is processed in the Process function
>     (maybe in
>     process function itself on Ontimer call if a timer has been set, right?
>
>     I am doing all computation in Datastream<Object> and then adding a
>     mapper to convert to DataStream<Row> to sink through JdbcAppendTableSink
>     which is part of Table API I think. I will definitely try exploring
>     the Jdbc Sink function and context to get the watermark.
>
>     Thinking out of the box, is it possible to add some extra operator
>     after sink which will always have watermark which is greater than
>     sink function watermarks,
>     as its a downstream operator.
>     Also, does the problem simplify if we have Kafka sink?
>
>     On Tue, Apr 28, 2020 at 10:35 PM Timo Walther <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Shubham,
>
>         you can call stream.process(...). The context of ProcessFunction
>         gives
>         you access to TimerService which let's you access the current
>         watermark.
>
>         I'm assuming your are using the Table API? As far as I remember,
>         watermark are travelling through the stream even if there is no
>         time-based operation happening. But you should double check that.
>
>         However, a side output does not guarantee that the data has
>         already been
>         written out to the sink. So I would recommend to customize the
>         JDBC sink
>         instead and look into the row column for getting the current
>         timestamp.
>
>         Or even better, there should also be
>         org.apache.flink.streaming.api.functions.sink.SinkFunction.Context
>         with
>         access to watermark.
>
>         I hope this helps.
>
>         Regards,
>         Timo
>
>         On 28.04.20 13:07, Shubham Kumar wrote:
>          > Hi everyone,
>          >
>          > I have a flink application having kafka sources which
>         calculates some
>          > stats based on it and pushes it to JDBC. Now, I want to know
>         till what
>          > timestamp is the data completely pushed in JDBC (i.e. no more
>         data will
>          > be pushed to timestamp smaller or equal than this). There
>         doesn't seem
>          > to be any direct programmatic way to do so.
>          >
>          > I came across the following thread which seemed most relevant
>         to my
>          > problem:
>          >
>         http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461
>          >
>          > However, I can't seem to understand how to chain a process
>         function
>          > before the sink task so as to put watermarks to a side
>         output. (I
>          > suspect it might have something to do with datastream.addSink
>         in regular
>          > datastream sinks vs sink.consumeDataStream(stream) in
>         JDBCAppendTableSink).
>          >
>          > Further what happens if there are no windows, how to approach
>         the
>          > problem then?
>          >
>          > Please share any pointers or relevant solution to tackle this.
>          >
>          > --
>          > Thanks & Regards
>          >
>          > Shubham Kumar
>          >
>
>
>
>     --
>     Thanks & Regards
>
>     Shubham Kumar
>
>
>
> --
> Thanks & Regards
>
> Shubham Kumar
>