Hello,
I'm dealing with an analytical job in streaming and I don't know how to write the last part. Actually I want to count all the elements in a window with a given status, so I keep a state with a Map[Status,Long]. This state is updated starting from tuples containing the oldStatus and the newStatus. So every event generates a +1 for the new status and a -1 for the old status. Then I want to reduce all these counts and move from a local and partial state to a global state that will be written in output. Right now my code look like: filteredLatestOrders.keyBy(x => x._1.getStatus).mapWithState(updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB)) where "filteredLatestOrder" is a DataStream containing informations about the elements, the new state and the old state. This produces in output: 2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0) 2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0) 2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1) 3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0) 4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1) I thought that keying with a fixed value would collect all the elements in a single node so that I could finally compute the final result, but they are left on different nodes and are never summed. Is this the correct approach? In that case, how can I do what I need? Is there a smarter way to count distinct evolving elements by their status in a streaming? Mind that the original source of events are updates to the status of an element and the requirement is that I want to count only the latest status available. Thank you in advance, Simone |
Solved. Probably there was an error in the way I was testing. Also I simplified the job and it works now. 2016-09-27 16:01 GMT+02:00 Simone Robutti <[hidden email]>:
|
Great to hear!
On Wed, Sep 28, 2016 at 5:18 PM, Simone Robutti <[hidden email]> wrote: > Solved. Probably there was an error in the way I was testing. Also I > simplified the job and it works now. > > 2016-09-27 16:01 GMT+02:00 Simone Robutti <[hidden email]>: >> >> Hello, >> >> I'm dealing with an analytical job in streaming and I don't know how to >> write the last part. >> >> Actually I want to count all the elements in a window with a given status, >> so I keep a state with a Map[Status,Long]. This state is updated starting >> from tuples containing the oldStatus and the newStatus. So every event >> generates a +1 for the new status and a -1 for the old status. Then I want >> to reduce all these counts and move from a local and partial state to a >> global state that will be written in output. >> >> Right now my code look like: >> >> filteredLatestOrders.keyBy(x => >> x._1.getStatus).mapWithState(updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB)) >> >> where "filteredLatestOrder" is a DataStream containing informations about >> the elements, the new state and the old state. >> >> This produces in output: >> >> 2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0) >> 2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0) >> 2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1) >> 3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0) >> 4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1) >> >> I thought that keying with a fixed value would collect all the elements in >> a single node so that I could finally compute the final result, but they are >> left on different nodes and are never summed. >> >> Is this the correct approach? In that case, how can I do what I need? Is >> there a smarter way to count distinct evolving elements by their status in a >> streaming? Mind that the original source of events are updates to the status >> of an element and the requirement is that I want to count only the latest >> status available. >> >> Thank you in advance, >> >> Simone > > |
Free forum by Nabble | Edit this page |