Initialise side input state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Initialise side input state

Maxim Parkachov
Hi Flink users,

I'm struggling with some basic concept and would appreciate some help. I have 2 Input streams, one is fast event stream and one is slow changing dimension. They have the same key and I use CoProcessFunction to store slow data in state and enrich fast data from this state. Everything works as expected.

Before I start processing fast streams on first run, I would like to completely initialise state. I though it could be done in open(), but I don't understand how it will be re-distributed across parallel operators.

Another alternative would be to create custom source and push all slow dimension data downstream, but I could not find how to hold processing fast data until state is initialised.

I realise that FLIP-17 (Side Inputs) is what I need, but is there some other way to implement it now ?

Thanks,
Maxim.

Reply | Threaded
Open this post in threaded view
|

Fwd: Initialise side input state

Maxim Parkachov
Hi Xingcan,

On Fri, Nov 3, 2017 at 3:38 AM, Xingcan Cui <[hidden email]> wrote:
Hi Maxim,

if I understand correctly, you actually need to JOIN the fast stream with the slow stream. Could you please share more details about your problem?

Sure I can explain more, with some example of pseudo-code. I have external DB with price list with following structure:

case class PriceList(productId, price)

My events are purchase events with following structure:

case class Purchase(productId, amount)

I would like to get final stream with TotalAmount = Amount*Price in structure like this:

case class PurchaseTotal(productId, totalAmount)

I have 2 corresponding input streams:

val prices = env.addSource(new PriceListSource).keyBy(_.productId)
val purchases = env.addSource(new PurchaseSource).keyBy(_.productId)

PriceListSource delivers me all CHANGES to external DB table.

Calculate function looks similar to:

class CalculateFunction extends CoProcessFunction[Purchase, PriceList, PurchaseTotal] {
  
  private var price: ValueState[Int] = _
  
  override def processElement1....... {
    out.collect(PurchaseTotal(purchase.productId, purchase.amount * priceList.value))
  }

  override def processElement2....... {
    price.update(priceList.value)
  }
}

And finally pipeline:

purchases.connect(prices).process(new CalculateFunction).print

The issue is, when I start program my price ValueState is empty and will not be populated with data which is not updated in DB. 
BTW, I cannot use AsyncIO to query DB, because of several technical restrictions.

1. When you mentioned "they have the same key", did you mean all the data get the same key or the logic should be applied with fast.key = slow.key?

I meant here that productId in purchase event is definitely exist in external price list DB (so, it is kind of inner join) 
 
2. What should be done to initialize the state? 

I need to read external DB table and populate price ValueState before processing first purchase event.

Hope this minimal example helps to understand.
Maxim.
 

Best,
Xingcan


On Fri, Nov 3, 2017 at 5:54 AM, Maxim Parkachov <[hidden email]> wrote:
Hi Flink users,

I'm struggling with some basic concept and would appreciate some help. I have 2 Input streams, one is fast event stream and one is slow changing dimension. They have the same key and I use CoProcessFunction to store slow data in state and enrich fast data from this state. Everything works as expected.

Before I start processing fast streams on first run, I would like to completely initialise state. I though it could be done in open(), but I don't understand how it will be re-distributed across parallel operators.

Another alternative would be to create custom source and push all slow dimension data downstream, but I could not find how to hold processing fast data until state is initialised.

I realise that FLIP-17 (Side Inputs) is what I need, but is there some other way to implement it now ?

Thanks,
Maxim.




Reply | Threaded
Open this post in threaded view
|

Re: Initialise side input state

Xingcan Cui
Hi Maxim,

thanks for the explanation. I think you can set a ValueState and a ListState for the price and the purchase events, separately. On one hand, when receiving a purchase event, you first check the price state. If it exists, you just collect the PurchaseTotal result; otherwise you can temporarily cache the event into the ListState. On the other hand, when receiving a price event, you first update the state and check if there exist some cached purchase events that need to be processed.

You can set a boolean flag in the function to avoid checking the purchase state every time (since it takes extra time). Don't worry about the state distribution problem. Flink will automatically divide them according to the keys (productId in your example). For more information about the state, you can refer to this document.

Hope that helps.

Best,
Xingcan

On Fri, Nov 3, 2017 at 2:11 PM, Maxim Parkachov <[hidden email]> wrote:
Hi Xingcan,

On Fri, Nov 3, 2017 at 3:38 AM, Xingcan Cui <[hidden email]> wrote:
Hi Maxim,

if I understand correctly, you actually need to JOIN the fast stream with the slow stream. Could you please share more details about your problem?

Sure I can explain more, with some example of pseudo-code. I have external DB with price list with following structure:

case class PriceList(productId, price)

My events are purchase events with following structure:

case class Purchase(productId, amount)

I would like to get final stream with TotalAmount = Amount*Price in structure like this:

case class PurchaseTotal(productId, totalAmount)

I have 2 corresponding input streams:

val prices = env.addSource(new PriceListSource).keyBy(_.productId)
val purchases = env.addSource(new PurchaseSource).keyBy(_.productId)

PriceListSource delivers me all CHANGES to external DB table.

Calculate function looks similar to:

class CalculateFunction extends CoProcessFunction[Purchase, PriceList, PurchaseTotal] {
  
  private var price: ValueState[Int] = _
  
  override def processElement1....... {
    out.collect(PurchaseTotal(purchase.productId, purchase.amount * priceList.value))
  }

  override def processElement2....... {
    price.update(priceList.value)
  }
}

And finally pipeline:

purchases.connect(prices).process(new CalculateFunction).print

The issue is, when I start program my price ValueState is empty and will not be populated with data which is not updated in DB. 
BTW, I cannot use AsyncIO to query DB, because of several technical restrictions.

1. When you mentioned "they have the same key", did you mean all the data get the same key or the logic should be applied with fast.key = slow.key?

I meant here that productId in purchase event is definitely exist in external price list DB (so, it is kind of inner join) 
 
2. What should be done to initialize the state? 

I need to read external DB table and populate price ValueState before processing first purchase event.

Hope this minimal example helps to understand.
Maxim.
 

Best,
Xingcan


On Fri, Nov 3, 2017 at 5:54 AM, Maxim Parkachov <[hidden email]> wrote:
Hi Flink users,

I'm struggling with some basic concept and would appreciate some help. I have 2 Input streams, one is fast event stream and one is slow changing dimension. They have the same key and I use CoProcessFunction to store slow data in state and enrich fast data from this state. Everything works as expected.

Before I start processing fast streams on first run, I would like to completely initialise state. I though it could be done in open(), but I don't understand how it will be re-distributed across parallel operators.

Another alternative would be to create custom source and push all slow dimension data downstream, but I could not find how to hold processing fast data until state is initialised.

I realise that FLIP-17 (Side Inputs) is what I need, but is there some other way to implement it now ?

Thanks,
Maxim.