Confusion about multiple use of one ValueState

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Confusion about multiple use of one ValueState

nsengupta
Hello all,

Let's say I want to hold some state value derived during one transformation, and then use that same state value in a subsequent transformation? For example:

myStream
.keyBy(fieldID) // Some field ID, may be 0
.map(new MyStatefulMapper())
.map(new MySubsequentMapper())
....

Now, I define MyStatefulMapper in the usual fashion:

public class MyStatefulMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { 

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

       // logic of accessing and updating the ValueState 'sum' above
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "mySum", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

So, by now, RuntimeContext has registered a State holder named 'mySum'. 

In the implementation of 'MySubsequentMapper', I need to access this State holder named 'mySum', perhaps thus (my thinking, I may be wrong):

public class MySubsequentMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { 

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> aSubsequentSum;
    private transient ValueState<Tuple2<Long, Long>> sum; // defined earlier
@Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { // logic of accessing and updating the ValueState 'aSubsequentSum' above
       // but this logic depends on the current contents of ValueState 'sum' created earlier
} @Override public void open(Configuration config) { // Logic to create ValueDescriptor for 'aSubsequentSum' which is owned by this operator
        // ...
        // Question: now, how do I prepare for accessing 'sum' which is a State holder, but created inside an earlier operator?
        sum = getRuntimeContext().getState(descriptor) // how can I pass the name 'mySum' (used in StateDescriptor)?
    }
}
I have two questions:

1) What I am trying to achieve: is that possible and even, advisable? If not, then what is the alternative?
2) Is there a guarantee that Flink will execute MyStatefulOperator.open() always before MySubsequentOperator.open() because of the lexical order of appearance in the source code?

-- Nirmalya




--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."
Reply | Threaded
Open this post in threaded view
|

Re: Confusion about multiple use of one ValueState

Balaji Rajagopalan
I don't think the valuestate defined in one map function is accessible in other map function this is my understanding, also you need to be aware there will be instance of map function created for each of your tuple in your stream, I had a similar use case where I had to pass in some state from one map function to another, I used redis for that. 

On Fri, May 13, 2016 at 8:58 AM, Nirmalya Sengupta <[hidden email]> wrote:
Hello all,

Let's say I want to hold some state value derived during one transformation, and then use that same state value in a subsequent transformation? For example:

myStream
.keyBy(fieldID) // Some field ID, may be 0
.map(new MyStatefulMapper())
.map(new MySubsequentMapper())
....

Now, I define MyStatefulMapper in the usual fashion:

public class MyStatefulMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { 

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

       // logic of accessing and updating the ValueState 'sum' above
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "mySum", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

So, by now, RuntimeContext has registered a State holder named 'mySum'. 

In the implementation of 'MySubsequentMapper', I need to access this State holder named 'mySum', perhaps thus (my thinking, I may be wrong):

public class MySubsequentMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { 

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> aSubsequentSum;
    private transient ValueState<Tuple2<Long, Long>> sum; // defined earlier
@Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { // logic of accessing and updating the ValueState 'aSubsequentSum' above
       // but this logic depends on the current contents of ValueState 'sum' created earlier
} @Override public void open(Configuration config) { // Logic to create ValueDescriptor for 'aSubsequentSum' which is owned by this operator
        // ...
        // Question: now, how do I prepare for accessing 'sum' which is a State holder, but created inside an earlier operator?
        sum = getRuntimeContext().getState(descriptor) // how can I pass the name 'mySum' (used in StateDescriptor)?
    }
}
I have two questions:

1) What I am trying to achieve: is that possible and even, advisable? If not, then what is the alternative?
2) Is there a guarantee that Flink will execute MyStatefulOperator.open() always before MySubsequentOperator.open() because of the lexical order of appearance in the source code?

-- Nirmalya




--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."

Reply | Threaded
Open this post in threaded view
|

Re: Confusion about multiple use of one ValueState

Balaji Rajagopalan
Even thought there are multiple instance of map object transient value object state is accessible across the object, so as the stream is flowing in the value can be updated based on application logic. 

On Fri, May 13, 2016 at 11:26 AM, Balaji Rajagopalan <[hidden email]> wrote:
I don't think the valuestate defined in one map function is accessible in other map function this is my understanding, also you need to be aware there will be instance of map function created for each of your tuple in your stream, I had a similar use case where I had to pass in some state from one map function to another, I used redis for that. 

On Fri, May 13, 2016 at 8:58 AM, Nirmalya Sengupta <[hidden email]> wrote:
Hello all,

Let's say I want to hold some state value derived during one transformation, and then use that same state value in a subsequent transformation? For example:

myStream
.keyBy(fieldID) // Some field ID, may be 0
.map(new MyStatefulMapper())
.map(new MySubsequentMapper())
....

Now, I define MyStatefulMapper in the usual fashion:

public class MyStatefulMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { 

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

       // logic of accessing and updating the ValueState 'sum' above
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "mySum", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

So, by now, RuntimeContext has registered a State holder named 'mySum'. 

In the implementation of 'MySubsequentMapper', I need to access this State holder named 'mySum', perhaps thus (my thinking, I may be wrong):

public class MySubsequentMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { 

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> aSubsequentSum;
    private transient ValueState<Tuple2<Long, Long>> sum; // defined earlier
@Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { // logic of accessing and updating the ValueState 'aSubsequentSum' above
       // but this logic depends on the current contents of ValueState 'sum' created earlier
} @Override public void open(Configuration config) { // Logic to create ValueDescriptor for 'aSubsequentSum' which is owned by this operator
        // ...
        // Question: now, how do I prepare for accessing 'sum' which is a State holder, but created inside an earlier operator?
        sum = getRuntimeContext().getState(descriptor) // how can I pass the name 'mySum' (used in StateDescriptor)?
    }
}
I have two questions:

1) What I am trying to achieve: is that possible and even, advisable? If not, then what is the alternative?
2) Is there a guarantee that Flink will execute MyStatefulOperator.open() always before MySubsequentOperator.open() because of the lexical order of appearance in the source code?

-- Nirmalya




--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."


Reply | Threaded
Open this post in threaded view
|

Re: Confusion about multiple use of one ValueState

nsengupta
In reply to this post by nsengupta

Hello Balaji <[hidden email]>,

Thanks for your reply. This confirms my earlier assumption that one of usual ways to do it is to hold and nurture the application-state in an external body; in your case: Redis.

So, I am trying to understand how does one share the handle to this external body amongst partitions: do I create a Connector to a Redis instance (referring to your case as an example) at the beginning of Flink application and share that amongst partitions using the _Broadcast_ mechanism? Obviously, the assumption is that the external body (Redis, in this case) will have to deal with concurrent access of elements of State, and updation etc. Operators simply call an API on Redis to store and retrieve elements from the application-state.

Is my understanding correct?

Yes, I am aware of the fresh creation of a Mapper for every tuple that comes in. In fact, this was the source of my original doubt before I posted the question. Thanks again for underscoring that.

-- Nirmalya 


-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."
Reply | Threaded
Open this post in threaded view
|

Re: Confusion about multiple use of one ValueState

Balaji Rajagopalan
I wrote a simple helper class, the redis connection are initialized in the constructor and there are
set and get methods to store and retreive values from your map functions. If you find any better way
to do this please share :). I am using redis scala client. 
object class RedisHelper
{
val redisHost = GlobalConfiguration.getString("redis.host", "localhost")
val redisPort = GlobalConfiguration.getInteger("redis.port", 6379)
LOG.info(s"Using host: [$redisHost] and port : [$redisPort] to connect to redis")

implicit val executionContext = ExecutionContext.global
implicit val akkaSystem = akka.actor.ActorSystem("redis-flink-actorsystem")
val redisClient = RedisClient(host = redisHost, port = redisPort)

def set(k: String, v: String, exTime: Option[Long]): Unit = {
redisClient.set(k, v, exTime)
}

def set(k: String, v: String): Unit = {
redisClient.set(k, v)
}

def get(k: String): Option[String] = {
try
{
type K = Option[String]
val f = redisClient.get[String](k)
Await.result(f, 2.seconds)
}
catch {
case e: Exception => {
LOG.error("Exception while getting data from redis " + e.fillInStackTrace())
None
}
}

}

On Fri, May 13, 2016 at 2:09 PM, Nirmalya Sengupta <[hidden email]> wrote:

Hello Balaji <[hidden email]>,

Thanks for your reply. This confirms my earlier assumption that one of usual ways to do it is to hold and nurture the application-state in an external body; in your case: Redis.

So, I am trying to understand how does one share the handle to this external body amongst partitions: do I create a Connector to a Redis instance (referring to your case as an example) at the beginning of Flink application and share that amongst partitions using the _Broadcast_ mechanism? Obviously, the assumption is that the external body (Redis, in this case) will have to deal with concurrent access of elements of State, and updation etc. Operators simply call an API on Redis to store and retrieve elements from the application-state.

Is my understanding correct?

Yes, I am aware of the fresh creation of a Mapper for every tuple that comes in. In fact, this was the source of my original doubt before I posted the question. Thanks again for underscoring that.

-- Nirmalya 


-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."

Reply | Threaded
Open this post in threaded view
|

Re: Confusion about multiple use of one ValueState

nsengupta
In reply to this post by nsengupta
Hello Balaji <[hidden email]>

Yes. The State holder 'sum' in my example is actually created outside the Mapper objects; so it stays where it is. I am creating 'var's inside the Mapper objects to _refer_ to the same object, irrespective of multiplicity of the Mappers.  The _open_ function is helping to make that association. This indeed was the basis of my logic. So, we are on the same page, there.

However, my question was about order of creation of State holders through multiple Operators and accessing one such State holder from inside an Operator which did not  create it. 

I understand from your response, that you sidestepped this difficulty by resorting to Redis. Redis has a global presence and hence, any operator from any partition can access it. 

My question was how did you share the Redis handle across partitions? Did you broadcast it?

-- Nirmalya







--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."
Reply | Threaded
Open this post in threaded view
|

Re: Confusion about multiple use of one ValueState

nsengupta
In reply to this post by Balaji Rajagopalan
Sorry, Balaji! Somehow, I missed this particular post of yours. Please ignore my last mail, where I am asking the same question.

--  Nirmalya