Statefull computation

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Statefull computation

defstat
Hi. I am struggling the past few days to find a solution on the following problem, using Apache Flink:

I have a stream of vectors, represented by files in a local folder. After a new text file is located using DataStream<String> text = env.readFileStream(...), I transform (flatMap), the Input into a SingleOutputStreamOperator<Tuple2<String, Integer>, ?>, with the Integer being the score coming from a scoring function.

I want to persist a global HashMap containing the top-k vectors, using their scores. I approached the problem using a statefull transformation.
1. The first problem I have is that the HashMap retains per-sink data (so for each thread of workers, one HashMap of data). How can I make that a Global collection

2. Using Apache Spark, I made that possible by
JavaPairDStream<String, Integer> stateDstream = tuples.updateStateByKey(updateFunction);

and then making transformations on the stateDstream. Is there a way I can get the same functionality using FLink?

Thanks in advance!
Reply | Threaded
Open this post in threaded view
|

Re: Statefull computation

Gyula Fóra-2
Hey!

What you are trying to do here is a global rolling aggregation, which is inherently a DOP 1 operation. Your observation is correct that if you want to use a simple stateful sink, you need to make sure that you set the parallelism to 1 in order to get correct results.

What you can do is to keep local top-ks in a parallel operator (let's say a flatmap) and periodically output the local top-k elements and merge them in a sink with parallelism=1 to produce a global top-k.

I am not 100% sure how you implemented the same functionality in spark but there you probably achieved the semantics I described above.

The whole problem is much easier if you are interested in the top-k elements grouped by some key, as then you can use partitioned operator states which will give you the correct results with arbitrary parallelism.

Cheers,
Gyula

defstat <[hidden email]> ezt írta (időpont: 2015. aug. 23., V, 21:40):
Hi. I am struggling the past few days to find a solution on the following
problem, using Apache Flink:

I have a stream of vectors, represented by files in a local folder. After a
new text file is located using DataStream<String> text =
env.readFileStream(...), I transform (flatMap), the Input into a
SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the Integer
being the score coming from a scoring function.

I want to persist a global HashMap containing the top-k vectors, using their
scores. I approached the problem using a statefull transformation.
1. The first problem I have is that the HashMap retains per-sink data (so
for each thread of workers, one HashMap of data). How can I make that a
Global collection

2. Using Apache Spark, I made that possible by
JavaPairDStream<String, Integer> stateDstream =
tuples.updateStateByKey(updateFunction);

and then making transformations on the stateDstream. Is there a way I can
get the same functionality using FLink?

Thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Statefull computation

Aljoscha Krettek
Hi,
I wanted to post something along the same lines but now I don't think the approach with local top-ks and merging works. For example, if you want to get top-4 and you do the pre-processing in two parallel instances. This input data would lead to incorrect results:

1. Instance: 
a 6
b 5
c 4
d 3

2. Instance:
e 10
f 9
g 8
h 7
a 6
b 5
c 4
d 3

So each parallel instance would forward its local top-4, which would lead to the end result:
e 10
f 9
g 8
h 7

Which is wrong. I think no matter how many elements you forward you can construct cases that lead to wrong results. (The problem seems to be that top-k is inherently global.)

Might also be that I'm tired and not seeing this right... :D

For the case where your elements are partitioned by some key you should be fine, though, as Gyula mentioned.

I'm not familiar with the Spark API, maybe you can help me out. What does the updateStateByKey() do if your state is not actually partitioned by a key. Plus, I'm curious in general what Spark does with this call. 

Cheers,
Aljoscha

On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <[hidden email]> wrote:
Hey!

What you are trying to do here is a global rolling aggregation, which is inherently a DOP 1 operation. Your observation is correct that if you want to use a simple stateful sink, you need to make sure that you set the parallelism to 1 in order to get correct results.

What you can do is to keep local top-ks in a parallel operator (let's say a flatmap) and periodically output the local top-k elements and merge them in a sink with parallelism=1 to produce a global top-k.

I am not 100% sure how you implemented the same functionality in spark but there you probably achieved the semantics I described above.

The whole problem is much easier if you are interested in the top-k elements grouped by some key, as then you can use partitioned operator states which will give you the correct results with arbitrary parallelism.

Cheers,
Gyula

defstat <[hidden email]> ezt írta (időpont: 2015. aug. 23., V, 21:40):
Hi. I am struggling the past few days to find a solution on the following
problem, using Apache Flink:

I have a stream of vectors, represented by files in a local folder. After a
new text file is located using DataStream<String> text =
env.readFileStream(...), I transform (flatMap), the Input into a
SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the Integer
being the score coming from a scoring function.

I want to persist a global HashMap containing the top-k vectors, using their
scores. I approached the problem using a statefull transformation.
1. The first problem I have is that the HashMap retains per-sink data (so
for each thread of workers, one HashMap of data). How can I make that a
Global collection

2. Using Apache Spark, I made that possible by
JavaPairDStream<String, Integer> stateDstream =
tuples.updateStateByKey(updateFunction);

and then making transformations on the stateDstream. Is there a way I can
get the same functionality using FLink?

Thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Statefull computation

Gyula Fóra
Hey, 

I am not sure if I get it, why aren't the results correct?

You don't instantly get the global top-k, but you are always updating it with the new local results.

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. aug. 23., V, 22:58):
Hi,
I wanted to post something along the same lines but now I don't think the approach with local top-ks and merging works. For example, if you want to get top-4 and you do the pre-processing in two parallel instances. This input data would lead to incorrect results:

1. Instance: 
a 6
b 5
c 4
d 3

2. Instance:
e 10
f 9
g 8
h 7
a 6
b 5
c 4
d 3

So each parallel instance would forward its local top-4, which would lead to the end result:
e 10
f 9
g 8
h 7

Which is wrong. I think no matter how many elements you forward you can construct cases that lead to wrong results. (The problem seems to be that top-k is inherently global.)

Might also be that I'm tired and not seeing this right... :D

For the case where your elements are partitioned by some key you should be fine, though, as Gyula mentioned.

I'm not familiar with the Spark API, maybe you can help me out. What does the updateStateByKey() do if your state is not actually partitioned by a key. Plus, I'm curious in general what Spark does with this call. 

Cheers,
Aljoscha

On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <[hidden email]> wrote:
Hey!

What you are trying to do here is a global rolling aggregation, which is inherently a DOP 1 operation. Your observation is correct that if you want to use a simple stateful sink, you need to make sure that you set the parallelism to 1 in order to get correct results.

What you can do is to keep local top-ks in a parallel operator (let's say a flatmap) and periodically output the local top-k elements and merge them in a sink with parallelism=1 to produce a global top-k.

I am not 100% sure how you implemented the same functionality in spark but there you probably achieved the semantics I described above.

The whole problem is much easier if you are interested in the top-k elements grouped by some key, as then you can use partitioned operator states which will give you the correct results with arbitrary parallelism.

Cheers,
Gyula

defstat <[hidden email]> ezt írta (időpont: 2015. aug. 23., V, 21:40):
Hi. I am struggling the past few days to find a solution on the following
problem, using Apache Flink:

I have a stream of vectors, represented by files in a local folder. After a
new text file is located using DataStream<String> text =
env.readFileStream(...), I transform (flatMap), the Input into a
SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the Integer
being the score coming from a scoring function.

I want to persist a global HashMap containing the top-k vectors, using their
scores. I approached the problem using a statefull transformation.
1. The first problem I have is that the HashMap retains per-sink data (so
for each thread of workers, one HashMap of data). How can I make that a
Global collection

2. Using Apache Spark, I made that possible by
JavaPairDStream<String, Integer> stateDstream =
tuples.updateStateByKey(updateFunction);

and then making transformations on the stateDstream. Is there a way I can
get the same functionality using FLink?

Thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Statefull computation

Aljoscha Krettek

Hi,
In the example the result is not correct because the values for a,b,c and d are never forwarded from instance 2 even though they would modify the global top-k result. It works, though, if you partition by the key field (tuple field 0, in this case) before doing the summation and local top-k. I think.

Best,
Aljoscha

On Sun, 23 Aug 2015 at 23:07 Gyula Fóra <[hidden email]> wrote:
Hey, 

I am not sure if I get it, why aren't the results correct?

You don't instantly get the global top-k, but you are always updating it with the new local results.

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. aug. 23., V, 22:58):
Hi,
I wanted to post something along the same lines but now I don't think the approach with local top-ks and merging works. For example, if you want to get top-4 and you do the pre-processing in two parallel instances. This input data would lead to incorrect results:

1. Instance: 
a 6
b 5
c 4
d 3

2. Instance:
e 10
f 9
g 8
h 7
a 6
b 5
c 4
d 3

So each parallel instance would forward its local top-4, which would lead to the end result:
e 10
f 9
g 8
h 7

Which is wrong. I think no matter how many elements you forward you can construct cases that lead to wrong results. (The problem seems to be that top-k is inherently global.)

Might also be that I'm tired and not seeing this right... :D

For the case where your elements are partitioned by some key you should be fine, though, as Gyula mentioned.

I'm not familiar with the Spark API, maybe you can help me out. What does the updateStateByKey() do if your state is not actually partitioned by a key. Plus, I'm curious in general what Spark does with this call. 

Cheers,
Aljoscha

On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <[hidden email]> wrote:
Hey!

What you are trying to do here is a global rolling aggregation, which is inherently a DOP 1 operation. Your observation is correct that if you want to use a simple stateful sink, you need to make sure that you set the parallelism to 1 in order to get correct results.

What you can do is to keep local top-ks in a parallel operator (let's say a flatmap) and periodically output the local top-k elements and merge them in a sink with parallelism=1 to produce a global top-k.

I am not 100% sure how you implemented the same functionality in spark but there you probably achieved the semantics I described above.

The whole problem is much easier if you are interested in the top-k elements grouped by some key, as then you can use partitioned operator states which will give you the correct results with arbitrary parallelism.

Cheers,
Gyula

defstat <[hidden email]> ezt írta (időpont: 2015. aug. 23., V, 21:40):
Hi. I am struggling the past few days to find a solution on the following
problem, using Apache Flink:

I have a stream of vectors, represented by files in a local folder. After a
new text file is located using DataStream<String> text =
env.readFileStream(...), I transform (flatMap), the Input into a
SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the Integer
being the score coming from a scoring function.

I want to persist a global HashMap containing the top-k vectors, using their
scores. I approached the problem using a statefull transformation.
1. The first problem I have is that the HashMap retains per-sink data (so
for each thread of workers, one HashMap of data). How can I make that a
Global collection

2. Using Apache Spark, I made that possible by
JavaPairDStream<String, Integer> stateDstream =
tuples.updateStateByKey(updateFunction);

and then making transformations on the stateDstream. Is there a way I can
get the same functionality using FLink?

Thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Statefull computation

Gyula Fóra
Hi,

Okay, than I understood correctly.

My point was something different. I never said that the approach I suggested will produce identical results to the continuos DOP 1 top-k, because thats impossible to parallelize.

What I suggested is to apply batch (or window) updates which would periodically give you the "current" top-k (so some updates will be overwritten before being sent to the output). If this is feasible or not, depends on the application, but it should probably be fine.

Cheers,
Gyula

On Mon, Aug 24, 2015 at 8:46 AM Aljoscha Krettek <[hidden email]> wrote:

Hi,
In the example the result is not correct because the values for a,b,c and d are never forwarded from instance 2 even though they would modify the global top-k result. It works, though, if you partition by the key field (tuple field 0, in this case) before doing the summation and local top-k. I think.

Best,
Aljoscha

On Sun, 23 Aug 2015 at 23:07 Gyula Fóra <[hidden email]> wrote:
Hey, 

I am not sure if I get it, why aren't the results correct?

You don't instantly get the global top-k, but you are always updating it with the new local results.

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. aug. 23., V, 22:58):
Hi,
I wanted to post something along the same lines but now I don't think the approach with local top-ks and merging works. For example, if you want to get top-4 and you do the pre-processing in two parallel instances. This input data would lead to incorrect results:

1. Instance: 
a 6
b 5
c 4
d 3

2. Instance:
e 10
f 9
g 8
h 7
a 6
b 5
c 4
d 3

So each parallel instance would forward its local top-4, which would lead to the end result:
e 10
f 9
g 8
h 7

Which is wrong. I think no matter how many elements you forward you can construct cases that lead to wrong results. (The problem seems to be that top-k is inherently global.)

Might also be that I'm tired and not seeing this right... :D

For the case where your elements are partitioned by some key you should be fine, though, as Gyula mentioned.

I'm not familiar with the Spark API, maybe you can help me out. What does the updateStateByKey() do if your state is not actually partitioned by a key. Plus, I'm curious in general what Spark does with this call. 

Cheers,
Aljoscha

On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <[hidden email]> wrote:
Hey!

What you are trying to do here is a global rolling aggregation, which is inherently a DOP 1 operation. Your observation is correct that if you want to use a simple stateful sink, you need to make sure that you set the parallelism to 1 in order to get correct results.

What you can do is to keep local top-ks in a parallel operator (let's say a flatmap) and periodically output the local top-k elements and merge them in a sink with parallelism=1 to produce a global top-k.

I am not 100% sure how you implemented the same functionality in spark but there you probably achieved the semantics I described above.

The whole problem is much easier if you are interested in the top-k elements grouped by some key, as then you can use partitioned operator states which will give you the correct results with arbitrary parallelism.

Cheers,
Gyula

defstat <[hidden email]> ezt írta (időpont: 2015. aug. 23., V, 21:40):
Hi. I am struggling the past few days to find a solution on the following
problem, using Apache Flink:

I have a stream of vectors, represented by files in a local folder. After a
new text file is located using DataStream<String> text =
env.readFileStream(...), I transform (flatMap), the Input into a
SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the Integer
being the score coming from a scoring function.

I want to persist a global HashMap containing the top-k vectors, using their
scores. I approached the problem using a statefull transformation.
1. The first problem I have is that the HashMap retains per-sink data (so
for each thread of workers, one HashMap of data). How can I make that a
Global collection

2. Using Apache Spark, I made that possible by
JavaPairDStream<String, Integer> stateDstream =
tuples.updateStateByKey(updateFunction);

and then making transformations on the stateDstream. Is there a way I can
get the same functionality using FLink?

Thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.