Processing Message after emitting to Sink

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Processing Message after emitting to Sink

KristoffSC
Hi all,
I have a special use case that I'm not sure how I can fulfill.

The use case is:
I have my main business processing pipe line that has a MQ source,
processFunction1, processFunction2  and MQ sink

PocessFunction1 apart from processing the main business message is also
emitting some side effects using side outputs. Those side outputs are send
to SideOutputMqSink that sends them to the queue.

The requirement is that PocessFunction1 must not send out the main business
message further to processFunction2 until side output from processFunction1
is send to the queue via SideOutputMqSink.

In general I don't have to use side outputs, although I do some extra
processing on them before sending to the sink so having sideOutput stream is
nice to have. Never the less, the key requirement is that we should wait
with further processing until side utput is send to the queue.

I could achieve it in a way that my processFunction1 in processElement
method will call MQ directly before sending out the main message, although I
dot like that idea.

I was thinking is there a way to have a Sink function that would be also a
FlatMap function?

The best solution would be to be able to process two streams (main and side
effect) in some nice way but with some barrier, so the main pipeline will
wait until side output is send.
Both streams can be keyed.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Processing Message after emitting to Sink

Timo Walther
Hi Kristoff,

synchronization across operators is not easy to achieve.

If one needs to wait until a sink has processed some element, it
requires that a sink participates in the pipeline. So it is not located
as a "leaf" operator but location somewhere in the middle.

So your idea to call MQ directly in processFunction1 sounds like a
reasonable solution to me. Maybe it is possible to wrap the original
code somehow. It could require to go one level deeper in the DataStream
API (using a custom stream transformation and operator instead of
ProcessFunction).

Another idea that comes to my mind is that you use the checkpoint
barrier as a synchronization tool. I'm not familiar how the MQ sink
works, but if you can ensure that the side output is written out in the
next checkpoint. You could leverage an interface like
`org.apache.flink.runtime.state.CheckpointListener`.

I hope others might come up with a better idea.

Regards,
Timo

On 14.04.20 23:59, KristoffSC wrote:

> Hi all,
> I have a special use case that I'm not sure how I can fulfill.
>
> The use case is:
> I have my main business processing pipe line that has a MQ source,
> processFunction1, processFunction2  and MQ sink
>
> PocessFunction1 apart from processing the main business message is also
> emitting some side effects using side outputs. Those side outputs are send
> to SideOutputMqSink that sends them to the queue.
>
> The requirement is that PocessFunction1 must not send out the main business
> message further to processFunction2 until side output from processFunction1
> is send to the queue via SideOutputMqSink.
>
> In general I don't have to use side outputs, although I do some extra
> processing on them before sending to the sink so having sideOutput stream is
> nice to have. Never the less, the key requirement is that we should wait
> with further processing until side utput is send to the queue.
>
> I could achieve it in a way that my processFunction1 in processElement
> method will call MQ directly before sending out the main message, although I
> dot like that idea.
>
> I was thinking is there a way to have a Sink function that would be also a
> FlatMap function?
>
> The best solution would be to be able to process two streams (main and side
> effect) in some nice way but with some barrier, so the main pipeline will
> wait until side output is send.
> Both streams can be keyed.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Processing Message after emitting to Sink

KristoffSC
Thank you very much for your answer.

I have a question regarding your first paragraph:
" it requires that a sink participates in the pipeline. So it is not located
as a "leaf" operator but location somewhere in the middle."

Isn't Sink a terminating operator? So as far as I know Sinks cannot be in
the middle of stream chain.


I will appreciate other comments as well.

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Processing Message after emitting to Sink

Timo Walther
Yes. But that's the problem of your use cases, right? If you need to
wait for the sink to be completed, it is not a terminating operator anymore.

Regards,
Timo


On 15.04.20 10:50, KristoffSC wrote:

> Thank you very much for your answer.
>
> I have a question regarding your first paragraph:
> " it requires that a sink participates in the pipeline. So it is not located
> as a "leaf" operator but location somewhere in the middle."
>
> Isn't Sink a terminating operator? So as far as I know Sinks cannot be in
> the middle of stream chain.
>
>
> I will appreciate other comments as well.
>
> Thanks,
> Krzysztof
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Processing Message after emitting to Sink

KristoffSC
My point was, that as far as I know, Sinks are "terminating" operators, that
ends the stream like .collect in Java 8 stream API. The don't emit elements
further and I cannot link then in a way:

source - proces - sink - process - sink

Sink function produces DataStreamSink which is used for emitting elements
from a streaming topology.
It is not SingleOutputStreamOperator or DataStream that I can use as input
for next operator.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Processing Message after emitting to Sink

Arvid Heise-3
Hi Kristoff,

I see a few ways, none of which are perfect.

The easiest way would be to not use a sink. Instead of outputting into a side-output, you could tag that element and have a successive asyncIO place that in RabbitMQ. If that asyncIO is ordered, then you can be sure that all following events are only processed after the element has been added. Of course, the downside is that you have to manually implement the communication with RabbitMQ and lose what Flink already has. This is what you already sketched out.

A more complicated approach would be to implement a custom operator with input selection to replace processFunction2 [1]. Let's call it op2. You would add the feedback from the sink implicitly, by also consuming from that MQ queue on op2. Then, processFunction1 would also emit some flag event on the main output together with the side output. Op2 would block the input on receiving that flag until it has read the appropriate entry from the MQ. However, this approach is really complex to implement and input selection is somewhat based on a best-effort. So before going that route, I'd do a small POC to see if it fits your needs.

The best solution, of course, would be to revise your overall architecture. It's quite smelly in a stream processing job that you need to halt execution at some point. If you give some more details, I could try to help.


On Wed, Apr 15, 2020 at 5:36 PM KristoffSC <[hidden email]> wrote:
My point was, that as far as I know, Sinks are "terminating" operators, that
ends the stream like .collect in Java 8 stream API. The don't emit elements
further and I cannot link then in a way:

source - proces - sink - process - sink

Sink function produces DataStreamSink which is used for emitting elements
from a streaming topology.
It is not SingleOutputStreamOperator or DataStream that I can use as input
for next operator.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Processing Message after emitting to Sink

Sameer Wadkar
One idea that comes to my mind is to convert ProcessFunction1 with a CoProcessFunction[1]. The processElement1() function can send to side-output and process and maintain the business function message as State without emitting it.  Then as Arvid mentioned processElement2() can listen on the side output (emitted by processElement1()) and when it receives it, emit the result from the state and clear the state.


On Thu, Apr 23, 2020 at 7:20 AM Arvid Heise <[hidden email]> wrote:
Hi Kristoff,

I see a few ways, none of which are perfect.

The easiest way would be to not use a sink. Instead of outputting into a side-output, you could tag that element and have a successive asyncIO place that in RabbitMQ. If that asyncIO is ordered, then you can be sure that all following events are only processed after the element has been added. Of course, the downside is that you have to manually implement the communication with RabbitMQ and lose what Flink already has. This is what you already sketched out.

A more complicated approach would be to implement a custom operator with input selection to replace processFunction2 [1]. Let's call it op2. You would add the feedback from the sink implicitly, by also consuming from that MQ queue on op2. Then, processFunction1 would also emit some flag event on the main output together with the side output. Op2 would block the input on receiving that flag until it has read the appropriate entry from the MQ. However, this approach is really complex to implement and input selection is somewhat based on a best-effort. So before going that route, I'd do a small POC to see if it fits your needs.

The best solution, of course, would be to revise your overall architecture. It's quite smelly in a stream processing job that you need to halt execution at some point. If you give some more details, I could try to help.


On Wed, Apr 15, 2020 at 5:36 PM KristoffSC <[hidden email]> wrote:
My point was, that as far as I know, Sinks are "terminating" operators, that
ends the stream like .collect in Java 8 stream API. The don't emit elements
further and I cannot link then in a way:

source - proces - sink - process - sink

Sink function produces DataStreamSink which is used for emitting elements
from a streaming topology.
It is not SingleOutputStreamOperator or DataStream that I can use as input
for next operator.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng