Question regarding state cleaning using timer

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Question regarding state cleaning using timer

Vijay Bhaskar
Hi
In the following example given in flink:
object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

There is only 1 state because there is one key. In the CountWindowAverage method there is one state descriptor :  new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
Name given as "average". In order to implement this is generic way, shall i modify the  method:

CountWindowAverage(keyName:String)  so that  new ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long, Long)]) is created. But how to configure TTL for this? Inside this method?
In the eample: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl ,   you have given a stand alone ValueStateDescriptor.  How can i use the TTL inside CountWindowAverage() per Key?

Regards
Bhaskar
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding state cleaning using timer

Hequn Cheng
Hi bhaskar,

You need change nothing if you want to handle multi keys. Flink will do it for you. The ValueState is a keyed state. You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. 
TTL can be used in the same way.

Best, Hequn


On Fri, Sep 14, 2018 at 10:29 PM [hidden email] <[hidden email]> wrote:
Hi
In the following example given in flink:
object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

There is only 1 state because there is one key. In the CountWindowAverage method there is one state descriptor :  new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
Name given as "average". In order to implement this is generic way, shall i modify the  method:

CountWindowAverage(keyName:String)  so that  new ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long, Long)]) is created. But how to configure TTL for this? Inside this method?
In the eample: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl ,   you have given a stand alone ValueStateDescriptor.  How can i use the TTL inside CountWindowAverage() per Key?

Regards
Bhaskar
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding state cleaning using timer

Vijay Bhaskar
Thanks Hequn. But i want to give random TTL for each partitioned key. How can i achieve it? 

Regards
Bhaskar

On Mon, Sep 17, 2018 at 7:30 AM Hequn Cheng <[hidden email]> wrote:
Hi bhaskar,

You need change nothing if you want to handle multi keys. Flink will do it for you. The ValueState is a keyed state. You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. 
TTL can be used in the same way.

Best, Hequn


On Fri, Sep 14, 2018 at 10:29 PM [hidden email] <[hidden email]> wrote:
Hi
In the following example given in flink:
object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

There is only 1 state because there is one key. In the CountWindowAverage method there is one state descriptor :  new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
Name given as "average". In order to implement this is generic way, shall i modify the  method:

CountWindowAverage(keyName:String)  so that  new ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long, Long)]) is created. But how to configure TTL for this? Inside this method?
In the eample: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl ,   you have given a stand alone ValueStateDescriptor.  How can i use the TTL inside CountWindowAverage() per Key?

Regards
Bhaskar
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding state cleaning using timer

Kostas Kloudas
Hi Bhaskar,

If you want different TTLs per key, then you should use timers with a process function 
as shown in [1]. This is though an old presentation, so now the RichProcessFunction is a KeyedProcessFunction.
Also please have a look at the training material in [2] and the process function documentation in [3]

Cheers,
Kostas

[1] https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction

On Sep 17, 2018, at 8:50 AM, Vijay Bhaskar <[hidden email]> wrote:

Thanks Hequn. But i want to give random TTL for each partitioned key. How can i achieve it? 

Regards
Bhaskar

On Mon, Sep 17, 2018 at 7:30 AM Hequn Cheng <[hidden email]> wrote:
Hi bhaskar,

You need change nothing if you want to handle multi keys. Flink will do it for you. The ValueState is a keyed state. You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. 
TTL can be used in the same way.

Best, Hequn


On Fri, Sep 14, 2018 at 10:29 PM [hidden email] <[hidden email]> wrote:
Hi
In the following example given in flink:
object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

There is only 1 state because there is one key. In the CountWindowAverage method there is one state descriptor :  new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
Name given as "average". In order to implement this is generic way, shall i modify the  method:

CountWindowAverage(keyName:String)  so that  new ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long, Long)]) is created. But how to configure TTL for this? Inside this method?
In the eample: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl ,   you have given a stand alone ValueStateDescriptor.  How can i use the TTL inside CountWindowAverage() per Key?

Regards
Bhaskar

Reply | Threaded
Open this post in threaded view
|

Re: Question regarding state cleaning using timer

Vijay Bhaskar
Thanks Kostas! 

Regards
Bhaskar

On Mon, Sep 17, 2018 at 9:05 PM Kostas Kloudas <[hidden email]> wrote:
Hi Bhaskar,

If you want different TTLs per key, then you should use timers with a process function 
as shown in [1]. This is though an old presentation, so now the RichProcessFunction is a KeyedProcessFunction.
Also please have a look at the training material in [2] and the process function documentation in [3]

Cheers,
Kostas

[1] https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction

On Sep 17, 2018, at 8:50 AM, Vijay Bhaskar <[hidden email]> wrote:

Thanks Hequn. But i want to give random TTL for each partitioned key. How can i achieve it? 

Regards
Bhaskar

On Mon, Sep 17, 2018 at 7:30 AM Hequn Cheng <[hidden email]> wrote:
Hi bhaskar,

You need change nothing if you want to handle multi keys. Flink will do it for you. The ValueState is a keyed state. You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. 
TTL can be used in the same way.

Best, Hequn


On Fri, Sep 14, 2018 at 10:29 PM [hidden email] <[hidden email]> wrote:
Hi
In the following example given in flink:
object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

There is only 1 state because there is one key. In the CountWindowAverage method there is one state descriptor :  new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
Name given as "average". In order to implement this is generic way, shall i modify the  method:

CountWindowAverage(keyName:String)  so that  new ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long, Long)]) is created. But how to configure TTL for this? Inside this method?
In the eample: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl ,   you have given a stand alone ValueStateDescriptor.  How can i use the TTL inside CountWindowAverage() per Key?

Regards
Bhaskar