Hi all,
I have a special use case that I'm not sure how I can fulfill. The use case is: I have my main business processing pipe line that has a MQ source, processFunction1, processFunction2 and MQ sink PocessFunction1 apart from processing the main business message is also emitting some side effects using side outputs. Those side outputs are send to SideOutputMqSink that sends them to the queue. The requirement is that PocessFunction1 must not send out the main business message further to processFunction2 until side output from processFunction1 is send to the queue via SideOutputMqSink. In general I don't have to use side outputs, although I do some extra processing on them before sending to the sink so having sideOutput stream is nice to have. Never the less, the key requirement is that we should wait with further processing until side utput is send to the queue. I could achieve it in a way that my processFunction1 in processElement method will call MQ directly before sending out the main message, although I dot like that idea. I was thinking is there a way to have a Sink function that would be also a FlatMap function? The best solution would be to be able to process two streams (main and side effect) in some nice way but with some barrier, so the main pipeline will wait until side output is send. Both streams can be keyed. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kristoff,
synchronization across operators is not easy to achieve. If one needs to wait until a sink has processed some element, it requires that a sink participates in the pipeline. So it is not located as a "leaf" operator but location somewhere in the middle. So your idea to call MQ directly in processFunction1 sounds like a reasonable solution to me. Maybe it is possible to wrap the original code somehow. It could require to go one level deeper in the DataStream API (using a custom stream transformation and operator instead of ProcessFunction). Another idea that comes to my mind is that you use the checkpoint barrier as a synchronization tool. I'm not familiar how the MQ sink works, but if you can ensure that the side output is written out in the next checkpoint. You could leverage an interface like `org.apache.flink.runtime.state.CheckpointListener`. I hope others might come up with a better idea. Regards, Timo On 14.04.20 23:59, KristoffSC wrote: > Hi all, > I have a special use case that I'm not sure how I can fulfill. > > The use case is: > I have my main business processing pipe line that has a MQ source, > processFunction1, processFunction2 and MQ sink > > PocessFunction1 apart from processing the main business message is also > emitting some side effects using side outputs. Those side outputs are send > to SideOutputMqSink that sends them to the queue. > > The requirement is that PocessFunction1 must not send out the main business > message further to processFunction2 until side output from processFunction1 > is send to the queue via SideOutputMqSink. > > In general I don't have to use side outputs, although I do some extra > processing on them before sending to the sink so having sideOutput stream is > nice to have. Never the less, the key requirement is that we should wait > with further processing until side utput is send to the queue. > > I could achieve it in a way that my processFunction1 in processElement > method will call MQ directly before sending out the main message, although I > dot like that idea. > > I was thinking is there a way to have a Sink function that would be also a > FlatMap function? > > The best solution would be to be able to process two streams (main and side > effect) in some nice way but with some barrier, so the main pipeline will > wait until side output is send. > Both streams can be keyed. > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Thank you very much for your answer.
I have a question regarding your first paragraph: " it requires that a sink participates in the pipeline. So it is not located as a "leaf" operator but location somewhere in the middle." Isn't Sink a terminating operator? So as far as I know Sinks cannot be in the middle of stream chain. I will appreciate other comments as well. Thanks, Krzysztof -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Yes. But that's the problem of your use cases, right? If you need to
wait for the sink to be completed, it is not a terminating operator anymore. Regards, Timo On 15.04.20 10:50, KristoffSC wrote: > Thank you very much for your answer. > > I have a question regarding your first paragraph: > " it requires that a sink participates in the pipeline. So it is not located > as a "leaf" operator but location somewhere in the middle." > > Isn't Sink a terminating operator? So as far as I know Sinks cannot be in > the middle of stream chain. > > > I will appreciate other comments as well. > > Thanks, > Krzysztof > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
My point was, that as far as I know, Sinks are "terminating" operators, that
ends the stream like .collect in Java 8 stream API. The don't emit elements further and I cannot link then in a way: source - proces - sink - process - sink Sink function produces DataStreamSink which is used for emitting elements from a streaming topology. It is not SingleOutputStreamOperator or DataStream that I can use as input for next operator. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kristoff, I see a few ways, none of which are perfect. The easiest way would be to not use a sink. Instead of outputting into a side-output, you could tag that element and have a successive asyncIO place that in RabbitMQ. If that asyncIO is ordered, then you can be sure that all following events are only processed after the element has been added. Of course, the downside is that you have to manually implement the communication with RabbitMQ and lose what Flink already has. This is what you already sketched out. A more complicated approach would be to implement a custom operator with input selection to replace processFunction2 [1]. Let's call it op2. You would add the feedback from the sink implicitly, by also consuming from that MQ queue on op2. Then, processFunction1 would also emit some flag event on the main output together with the side output. Op2 would block the input on receiving that flag until it has read the appropriate entry from the MQ. However, this approach is really complex to implement and input selection is somewhat based on a best-effort. So before going that route, I'd do a small POC to see if it fits your needs. The best solution, of course, would be to revise your overall architecture. It's quite smelly in a stream processing job that you need to halt execution at some point. If you give some more details, I could try to help. On Wed, Apr 15, 2020 at 5:36 PM KristoffSC <[hidden email]> wrote: My point was, that as far as I know, Sinks are "terminating" operators, that -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
One idea that comes to my mind is to convert ProcessFunction1 with a CoProcessFunction[1]. The processElement1() function can send to side-output and process and maintain the business function message as State without emitting it. Then as Arvid mentioned processElement2() can listen on the side output (emitted by processElement1()) and when it receives it, emit the result from the state and clear the state. On Thu, Apr 23, 2020 at 7:20 AM Arvid Heise <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |