your advice please regarding state

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

your advice please regarding state

avilevi
Hi ,
I am very new to flink so please be gentle :)

The challenge:
I have a road sensor that should scan billons of cars per day. for starter I want to recognise if each car that passes by is new or not. new cars (never been seen before by that sensor ) will be placed on a different topic on kafka than the other (total of two topics for new and old) .
 under the assumption that the state will contain billions of unique car ids.

Suggested Solutions
My question is it which approach is better. 
Both approaches using RocksDB 

1. use the ValueState and to split the steam like 
  val domainsSrc = env
    .addSource(consumer)
    .keyBy(car => car.id)
    .map(...)
and checking if the state value is null to recognise new cars. if new than I will update the state
how will the persistent data will be shard among the nodes in the cluster (let's say that I have 10 nodes) ?

2. use MapState and to partition the stream to groups by some arbitrary factor e.g
val domainsSrc = env
    .addSource(consumer)
    .keyBy{ car =>
        val h car.id.hashCode % partitionFactor
        math.abs(h)
    } .map(...)
and to check mapState.keys.contains(car.id) if not - add it to the state 

which approach is better ?

Thanks in advance 
Avi
Reply | Threaded
Open this post in threaded view
|

Re: your advice please regarding state

Jamie Grier-2
Hi Avi,

The typical approach would be as you've described in #1.  #2 is not necessary -- #1 is already doing basically exactly that.

-Jamie


On Wed, Nov 21, 2018 at 3:36 AM Avi Levi <[hidden email]> wrote:
Hi ,
I am very new to flink so please be gentle :)

The challenge:
I have a road sensor that should scan billons of cars per day. for starter I want to recognise if each car that passes by is new or not. new cars (never been seen before by that sensor ) will be placed on a different topic on kafka than the other (total of two topics for new and old) .
 under the assumption that the state will contain billions of unique car ids.

Suggested Solutions
My question is it which approach is better. 
Both approaches using RocksDB 

1. use the ValueState and to split the steam like 
  val domainsSrc = env
    .addSource(consumer)
    .keyBy(car => car.id)
    .map(...)
and checking if the state value is null to recognise new cars. if new than I will update the state
how will the persistent data will be shard among the nodes in the cluster (let's say that I have 10 nodes) ?

2. use MapState and to partition the stream to groups by some arbitrary factor e.g
val domainsSrc = env
    .addSource(consumer)
    .keyBy{ car =>
        val h car.id.hashCode % partitionFactor
        math.abs(h)
    } .map(...)
and to check mapState.keys.contains(car.id) if not - add it to the state 

which approach is better ?

Thanks in advance 
Avi
Reply | Threaded
Open this post in threaded view
|

Re: your advice please regarding state

avilevi
Thanks a lot!  got it :)

On Wed, Nov 21, 2018 at 11:40 PM Jamie Grier <[hidden email]> wrote:
Hi Avi,

The typical approach would be as you've described in #1.  #2 is not necessary -- #1 is already doing basically exactly that.

-Jamie


On Wed, Nov 21, 2018 at 3:36 AM Avi Levi <[hidden email]> wrote:
Hi ,
I am very new to flink so please be gentle :)

The challenge:
I have a road sensor that should scan billons of cars per day. for starter I want to recognise if each car that passes by is new or not. new cars (never been seen before by that sensor ) will be placed on a different topic on kafka than the other (total of two topics for new and old) .
 under the assumption that the state will contain billions of unique car ids.

Suggested Solutions
My question is it which approach is better. 
Both approaches using RocksDB 

1. use the ValueState and to split the steam like 
  val domainsSrc = env
    .addSource(consumer)
    .keyBy(car => car.id)
    .map(...)
and checking if the state value is null to recognise new cars. if new than I will update the state
how will the persistent data will be shard among the nodes in the cluster (let's say that I have 10 nodes) ?

2. use MapState and to partition the stream to groups by some arbitrary factor e.g
val domainsSrc = env
    .addSource(consumer)
    .keyBy{ car =>
        val h car.id.hashCode % partitionFactor
        math.abs(h)
    } .map(...)
and to check mapState.keys.contains(car.id) if not - add it to the state 

which approach is better ?

Thanks in advance 
Avi
Reply | Threaded
Open this post in threaded view
|

Re: your advice please regarding state

yinhua.dai
General approach#1 is ok, but you may have to use some hash based key
selector if you have a heavy data skew.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: your advice please regarding state

Fabian Hueske-2
Hi Avi,

I'd definitely go for approach #1.
Flink will hash partition the records across all nodes. This is basically the same as a distributed key-value store sharding keys.
I would not try to fine tune the partitioning. You should try to use as many keys as possible to ensure an even distribution of key. This will also allow to scale your application later to more tasks.

Best, Fabian

Am Di., 27. Nov. 2018 um 05:21 Uhr schrieb yinhua.dai <[hidden email]>:
General approach#1 is ok, but you may have to use some hash based key
selector if you have a heavy data skew.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: your advice please regarding state

avilevi
Thank you very much. got it.

On Tue, Nov 27, 2018 at 12:53 PM Fabian Hueske <[hidden email]> wrote:
Hi Avi,

I'd definitely go for approach #1.
Flink will hash partition the records across all nodes. This is basically the same as a distributed key-value store sharding keys.
I would not try to fine tune the partitioning. You should try to use as many keys as possible to ensure an even distribution of key. This will also allow to scale your application later to more tasks.

Best, Fabian

Am Di., 27. Nov. 2018 um 05:21 Uhr schrieb yinhua.dai <[hidden email]>:
General approach#1 is ok, but you may have to use some hash based key
selector if you have a heavy data skew.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/