RichAsyncFunction + BroadcastProcessFunction

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

RichAsyncFunction + BroadcastProcessFunction

John Morrow
Hi Flink Users,

I have a BroadcastProcessFunction and in the processElement method I sometimes need to do some http requests, depending on the broadcast state.

Because I'm doing http requests, I'd prefer the function to be async, like RichAsyncFunction.asyncInvoke(), but RichAsyncFunction doesn't support broadcast data.

Is there any way to combine the functionality of a RichAsyncFunction + a BroadcastProcessFunction?

Thanks!
John.
Reply | Threaded
Open this post in threaded view
|

Re: RichAsyncFunction + BroadcastProcessFunction

Tzu-Li (Gordon) Tai
Hi John,

Have you considered letting the BroadcastProcessFunction output events that
indicate extra external HTTP requests needs to be performed, and have them
consumed by a downstream async IO operator to complete the HTTP request?
That could work depending on what exactly you need to do in your specific
case.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: RichAsyncFunction + BroadcastProcessFunction

John Morrow
Hi Gordon,

That sounds good. My first thought was that if I have to break up the logic I'd end up with:

BroadcastFunction1 --> AsyncFunction --> BroadcastFunction2

...with Broadcast1 & BroadcastFunction2 needing the same broadcast state, and that state could change while an item is being processed through the chain. But I could leave a marker to do the call like you suggested and have a placeholder for the result and that might do the trick.

Thanks again for the suggestion! Below is a sudo-code example of how I think I'll be able to get it to work in case it's helpful for anyone else.

Cheers,
John.



Function:

processElement(item) { //BroadcastFunction

  if (broadcastState.checkInventoryLevel) {
      long inventoryLevel = http://get_the_inventory_level(item.id) // Zzzzzz
      if (item.inventory < X) {
            ctx.output("reorder-outputTag", item)
      }
  }
    ...
  item.status = "checked";
  collect(item);
}


 ---> broken down in to functions A, B & C


FunctionA:

processElement(item) { //BroadcastFunction

  if (broadcastState.checkInventoryLevel) {
      collect(Tuple2(item, True))
      item.inventoryLevel = http://get_the_inventory(item.id)
  } else {
      collect(Tuple2(item, False))
  }


FunctionB:

asyncInvoke(Tuple2<item, boolean needsInventory>) { //AsyncFunction

  if (needsInventory)
      item.inventoryLevel = http://get_the_inventory(item.id)
  }
  collect(item);


FunctionC:

processElement(item) { //FlatMapFunction

  if (item.inventory != null && item.inventory < X) {
        ctx.output("reorder-outputTag", item)
    }
  item.status = "checked";
  collect(item);
}



From: Tzu-Li (Gordon) Tai <[hidden email]>
Sent: Tuesday 17 March 2020 10:05
To: [hidden email] <[hidden email]>
Subject: Re: RichAsyncFunction + BroadcastProcessFunction
 
Hi John,

Have you considered letting the BroadcastProcessFunction output events that
indicate extra external HTTP requests needs to be performed, and have them
consumed by a downstream async IO operator to complete the HTTP request?
That could work depending on what exactly you need to do in your specific
case.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/