Re: sharded state, 2-step operation

Posted by Michael Warnock on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/sharded-state-2-step-operation-tp8631p8637.html

Thanks for the quick response!

I've been wondering about Connected streams and CoFlatMap, but either I don't see all the ways they can be used, or they don't solve my problem.  Do you know of any examples outside of the documentation?  My searches for "flink comap example" and similar haven't turned anything up.

On Tue, Aug 23, 2016 at 11:41 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

This is a tricky one. State access and changes are not shared across operators in Flink.
We chose that design because it makes it possible to work on "local" state in each operator
  - state automatically shards with the computation
  - no locking / concurrency implications
  - asynchronous persistence

Sharing state across operations between two operations in the same stage works with the CoMap / CoFlatMap functions
Sharing state across successive nodes does not work, because the functions could be executed on different machines and one would need to do remote and synchronized state updates that way.

Do you think you can use the CoMap / CoFlatMap functions for this?

Greetings,
Stephan


On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock <[hidden email]> wrote:
I'm trying to do something that seems like it should be possible, but my implementation doesn't behave as expected, and I'm not sure how else to express it.

Let's say the stream is composed of tuples like this: (Alice, Bob, 1) and I want to keyBy(1), flatMap with state associated with Alice, then keyBy(2) with state associated with Bob.  The trick is, when I later get a tuple like (Bob, Alice, 1), I want the first operator to see the state that was updated in the second op previously.  Is this possible?  I tried  implementing both operators as one, getting the state by descriptor in the flatMap body, and even instantiating the operator only once; the behavior is, as you might guess, that the state in stage 1 doesn't include changes made previously in stage 2.

Is there any way to do this without throwing away the parallelism?

Thanks in advance!
~Michael