http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Publishing-Sink-Task-watermarks-outside-flink-tp34765p35219.html
great that tweaking the JDBC sink helped. Maybe I don't fully understand
the minimum of all subtasks. Because it is sent to all subsequent
operators by the precedeing operator. So a watermark can trigger a
flush. I guess you need a measure of completeness when the flush() has
watermark timestamps in Flink. A leaf operator (with parallelism 1)
> Following up on this,
>
> I tried tweaking the Jdbc Sink as Timo suggested and was successful in
> it. Basically I added a member /long maxWatermarkSeen /in JDBCOutputFormat,
> so whenever a new record is added to the batch it updates the
> /maxWatermarkSeen/ for this subtask with
> /org.apache.flink.streaming.api.functions.sink.SinkFunction.Context.watermark/
> (if its greater).
> So whenever a /JDBCOutputFormat.flush()/ is called I can be sure that
> after executing batch, all records having timestamp below
> /maxWatermarkSeen/ are pushed to JDBC.
>
> Now, the actual answer I am looking for is minimum of /maxWatermarkSeen/
> for all subtasks. I can constantly update this to DB as </Subtask Index,
> Watermark/> and take minimum in DB.
> I guess the aggregation can't be done inside flink amongst subtasks?
>
> Now, I have two questions:
>
> 1) Should I update this to DB using async I/O feature of flink or just
> perform a blocking query in /JDBCOutputFormat.flush()/ function after
> executing the batch.
> 2) If I will be using Kafka sink (or any other sink for that matter), do
> I have to again tweak around with its SinkFunction for this functionality?
> General idea being that this a common functionality for users to
> know till what timestamp is sink complete and can have simpler solutions.
>
> Thanks
> Shubham
>
> On Wed, Apr 29, 2020 at 3:27 AM Shubham Kumar
> <
[hidden email] <mailto:
[hidden email]>> wrote:
>
> Hi Timo,
>
> Yeah, I got the idea of getting access to timers through process
> function and had the same result which you explained
> that is a side output doesn't guarantee that the data is written out
> to sink. (so maybe Fabian in that post pointed out something
> else which I am missing). If I am correct then, data is written to
> side output as soon as it is processed in the Process function
> (maybe in
> process function itself on Ontimer call if a timer has been set, right?
>
> I am doing all computation in Datastream<Object> and then adding a
> mapper to convert to DataStream<Row> to sink through JdbcAppendTableSink
> which is part of Table API I think. I will definitely try exploring
> the Jdbc Sink function and context to get the watermark.
>
> Thinking out of the box, is it possible to add some extra operator
> after sink which will always have watermark which is greater than
> sink function watermarks,
> as its a downstream operator.
> Also, does the problem simplify if we have Kafka sink?
>
> On Tue, Apr 28, 2020 at 10:35 PM Timo Walther <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Hi Shubham,
>
> you can call stream.process(...). The context of ProcessFunction
> gives
> you access to TimerService which let's you access the current
> watermark.
>
> I'm assuming your are using the Table API? As far as I remember,
> watermark are travelling through the stream even if there is no
> time-based operation happening. But you should double check that.
>
> However, a side output does not guarantee that the data has
> already been
> written out to the sink. So I would recommend to customize the
> JDBC sink
> instead and look into the row column for getting the current
> timestamp.
>
> Or even better, there should also be
> org.apache.flink.streaming.api.functions.sink.SinkFunction.Context
> with
> access to watermark.
>
> I hope this helps.
>
> Regards,
> Timo
>
> On 28.04.20 13:07, Shubham Kumar wrote:
> > Hi everyone,
> >
> > I have a flink application having kafka sources which
> calculates some
> > stats based on it and pushes it to JDBC. Now, I want to know
> till what
> > timestamp is the data completely pushed in JDBC (i.e. no more
> data will
> > be pushed to timestamp smaller or equal than this). There
> doesn't seem
> > to be any direct programmatic way to do so.
> >
> > I came across the following thread which seemed most relevant
> to my
> > problem:
> >
>
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461> >
> > However, I can't seem to understand how to chain a process
> function
> > before the sink task so as to put watermarks to a side
> output. (I
> > suspect it might have something to do with datastream.addSink
> in regular
> > datastream sinks vs sink.consumeDataStream(stream) in
> JDBCAppendTableSink).
> >
> > Further what happens if there are no windows, how to approach
> the
> > problem then?
> >
> > Please share any pointers or relevant solution to tackle this.
> >
> > --
> > Thanks & Regards
> >
> > Shubham Kumar
> >
>
>
>
> --
> Thanks & Regards
>
> Shubham Kumar
>
>
>
> --
> Thanks & Regards
>
> Shubham Kumar
>