Handling large state (incremental snapshot?)

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Handling large state (incremental snapshot?)

Hironori Ogibayashi
Hello,

I am trying to implement windowed distinct count on a stream. In this
case, the state
have to hold all distinct value in the window, so can be large.

In my test, if the state size become about 400MB, checkpointing takes
40sec and spends most of Taskmanager's CPU.
Are there any good way to handle this situation?

Flink document mentions about incremental snapshot, and I am interested in it,
but could not find how to enable it. (not implemented yet?)
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html

Regards,
Hironori
Reply | Threaded
Open this post in threaded view
|

Re: Handling large state (incremental snapshot?)

Aljoscha Krettek
Hi,
I guess you are using the FsStateBackend, is that correct? You could try using the RocksDB state backend: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend

With this, throughput will be lower but the overhead per checkpoint could be lower. Also, with this most of the file copying necessary for the checkpoint will be done while data processing keeps running (asynchronous snapshot).

As to incremental snapshots. I'm afraid this feature is not yet implemented but we're working on it.

Cheers,
Aljoscha

On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <[hidden email]> wrote:
Hello,

I am trying to implement windowed distinct count on a stream. In this
case, the state
have to hold all distinct value in the window, so can be large.

In my test, if the state size become about 400MB, checkpointing takes
40sec and spends most of Taskmanager's CPU.
Are there any good way to handle this situation?

Flink document mentions about incremental snapshot, and I am interested in it,
but could not find how to enable it. (not implemented yet?)
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html

Regards,
Hironori
Reply | Threaded
Open this post in threaded view
|

Re: Handling large state (incremental snapshot?)

Hironori Ogibayashi
Aljoscha,

Thank you for your quick response.
Yes, I am using FsStateBackend, so I will try RocksDB backend.

Regards,
Hironori

2016-04-05 21:23 GMT+09:00 Aljoscha Krettek <[hidden email]>:

> Hi,
> I guess you are using the FsStateBackend, is that correct? You could try
> using the RocksDB state backend:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend
>
> With this, throughput will be lower but the overhead per checkpoint could be
> lower. Also, with this most of the file copying necessary for the checkpoint
> will be done while data processing keeps running (asynchronous snapshot).
>
> As to incremental snapshots. I'm afraid this feature is not yet implemented
> but we're working on it.
>
> Cheers,
> Aljoscha
>
> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <[hidden email]>
> wrote:
>>
>> Hello,
>>
>> I am trying to implement windowed distinct count on a stream. In this
>> case, the state
>> have to hold all distinct value in the window, so can be large.
>>
>> In my test, if the state size become about 400MB, checkpointing takes
>> 40sec and spends most of Taskmanager's CPU.
>> Are there any good way to handle this situation?
>>
>> Flink document mentions about incremental snapshot, and I am interested in
>> it,
>> but could not find how to enable it. (not implemented yet?)
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
>>
>> Regards,
>> Hironori
Reply | Threaded
Open this post in threaded view
|

Re: Handling large state (incremental snapshot?)

Hironori Ogibayashi
I tried RocksDB, but the result was almost the same.

I used the following code and put 2.6million distinct records into Kafka.
After processing all records, the state on the HDFS become about 250MB
and time needed for
the checkpoint was almost 5sec. Processing throughput was
FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec

---
    env.setStateBackend(new
RocksDBStateBackend("hdfs://<hdfs_host>:8020/apps/flink/checkpoints"));

    val stream = env
      .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new
SimpleStringSchema(), properties))
      .map(parseJson(_))
      .timeWindowAll(Time.of(10, TimeUnit.DAYS))
      .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5)))
      // count distinct values
      .fold(Set[String]()){(r,i) => { r + i}}
      .map{x => (System.currentTimeMillis(), x.size)}
      .addSink(new ElasticsearchSink(config, transports, new
IndexRequestBuilder[Tuple2[Long, Int]]  {
        override def createIndexRequest(element: Tuple2[Long, Int],
ctx: RuntimeContext): IndexRequest = {
          val json = new HashMap[String, AnyRef]
          json.put("@timestamp", new Timestamp(element._1))
          json.put("count", element._2: java.lang.Integer)
          Requests.indexRequest.index("dummy3").`type`("my-type").source(json)
        }
      }))
---

I guess this is because I used non-keyed stream, so I had one state
record with a big value (all distinct value).
I think copying all 250MB(or more) file to HDFS in every checkpoint
will be heavy, so I will try storing the distinct values
in the external datastore (e.g. redis).
Also, when incremental snapshot get implemented, I want to try.

Regards,
Hironori

2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi <[hidden email]>:

> Aljoscha,
>
> Thank you for your quick response.
> Yes, I am using FsStateBackend, so I will try RocksDB backend.
>
> Regards,
> Hironori
>
> 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>> Hi,
>> I guess you are using the FsStateBackend, is that correct? You could try
>> using the RocksDB state backend:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend
>>
>> With this, throughput will be lower but the overhead per checkpoint could be
>> lower. Also, with this most of the file copying necessary for the checkpoint
>> will be done while data processing keeps running (asynchronous snapshot).
>>
>> As to incremental snapshots. I'm afraid this feature is not yet implemented
>> but we're working on it.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <[hidden email]>
>> wrote:
>>>
>>> Hello,
>>>
>>> I am trying to implement windowed distinct count on a stream. In this
>>> case, the state
>>> have to hold all distinct value in the window, so can be large.
>>>
>>> In my test, if the state size become about 400MB, checkpointing takes
>>> 40sec and spends most of Taskmanager's CPU.
>>> Are there any good way to handle this situation?
>>>
>>> Flink document mentions about incremental snapshot, and I am interested in
>>> it,
>>> but could not find how to enable it. (not implemented yet?)
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
>>>
>>> Regards,
>>> Hironori
Reply | Threaded
Open this post in threaded view
|

Re: Handling large state (incremental snapshot?)

Aljoscha Krettek
Ah yes, you're right. With the non-keyed stream it doesn't make a big difference because it's only one big state value.

The throughput still seems quite low. Have you ever tried looking at the "back pressure" tab on the Flink dashboard. For this I would suggest to disable chaining, so that every operator is run in an isolated task:

env.disableOperatorChaining();

On Thu, 7 Apr 2016 at 05:11 Hironori Ogibayashi <[hidden email]> wrote:
I tried RocksDB, but the result was almost the same.

I used the following code and put 2.6million distinct records into Kafka.
After processing all records, the state on the HDFS become about 250MB
and time needed for
the checkpoint was almost 5sec. Processing throughput was
FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec

---
    env.setStateBackend(new
RocksDBStateBackend("hdfs://<hdfs_host>:8020/apps/flink/checkpoints"));

    val stream = env
      .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new
SimpleStringSchema(), properties))
      .map(parseJson(_))
      .timeWindowAll(Time.of(10, TimeUnit.DAYS))
      .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5)))
      // count distinct values
      .fold(Set[String]()){(r,i) => { r + i}}
      .map{x => (System.currentTimeMillis(), x.size)}
      .addSink(new ElasticsearchSink(config, transports, new
IndexRequestBuilder[Tuple2[Long, Int]]  {
        override def createIndexRequest(element: Tuple2[Long, Int],
ctx: RuntimeContext): IndexRequest = {
          val json = new HashMap[String, AnyRef]
          json.put("@timestamp", new Timestamp(element._1))
          json.put("count", element._2: java.lang.Integer)
          Requests.indexRequest.index("dummy3").`type`("my-type").source(json)
        }
      }))
---

I guess this is because I used non-keyed stream, so I had one state
record with a big value (all distinct value).
I think copying all 250MB(or more) file to HDFS in every checkpoint
will be heavy, so I will try storing the distinct values
in the external datastore (e.g. redis).
Also, when incremental snapshot get implemented, I want to try.

Regards,
Hironori

2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi <[hidden email]>:
> Aljoscha,
>
> Thank you for your quick response.
> Yes, I am using FsStateBackend, so I will try RocksDB backend.
>
> Regards,
> Hironori
>
> 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>> Hi,
>> I guess you are using the FsStateBackend, is that correct? You could try
>> using the RocksDB state backend:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend
>>
>> With this, throughput will be lower but the overhead per checkpoint could be
>> lower. Also, with this most of the file copying necessary for the checkpoint
>> will be done while data processing keeps running (asynchronous snapshot).
>>
>> As to incremental snapshots. I'm afraid this feature is not yet implemented
>> but we're working on it.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <[hidden email]>
>> wrote:
>>>
>>> Hello,
>>>
>>> I am trying to implement windowed distinct count on a stream. In this
>>> case, the state
>>> have to hold all distinct value in the window, so can be large.
>>>
>>> In my test, if the state size become about 400MB, checkpointing takes
>>> 40sec and spends most of Taskmanager's CPU.
>>> Are there any good way to handle this situation?
>>>
>>> Flink document mentions about incremental snapshot, and I am interested in
>>> it,
>>> but could not find how to enable it. (not implemented yet?)
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
>>>
>>> Regards,
>>> Hironori
Reply | Threaded
Open this post in threaded view
|

Re: Handling large state (incremental snapshot?)

Shannon Carey
In reply to this post by Hironori Ogibayashi
HyperLogLog is worth a mention, but only if you don't mind some inaccuracy.



On 4/7/16, 8:41 AM, "Hironori Ogibayashi" <[hidden email]> wrote:

>I tried RocksDB, but the result was almost the same.
>
>I used the following code and put 2.6million distinct records into Kafka.
>After processing all records, the state on the HDFS become about 250MB
>and time needed for
>the checkpoint was almost 5sec. Processing throughput was
>FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec
>
>---
>    env.setStateBackend(new
>RocksDBStateBackend("hdfs://<hdfs_host>:8020/apps/flink/checkpoints"));
>
>    val stream = env
>      .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new
>SimpleStringSchema(), properties))
>      .map(parseJson(_))
>      .timeWindowAll(Time.of(10, TimeUnit.DAYS))
>      .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>      // count distinct values
>      .fold(Set[String]()){(r,i) => { r + i}}
>      .map{x => (System.currentTimeMillis(), x.size)}
>      .addSink(new ElasticsearchSink(config, transports, new
>IndexRequestBuilder[Tuple2[Long, Int]]  {
>        override def createIndexRequest(element: Tuple2[Long, Int],
>ctx: RuntimeContext): IndexRequest = {
>          val json = new HashMap[String, AnyRef]
>          json.put("@timestamp", new Timestamp(element._1))
>          json.put("count", element._2: java.lang.Integer)
>          Requests.indexRequest.index("dummy3").`type`("my-type").source(json)
>        }
>      }))
>---
>
>I guess this is because I used non-keyed stream, so I had one state
>record with a big value (all distinct value).
>I think copying all 250MB(or more) file to HDFS in every checkpoint
>will be heavy, so I will try storing the distinct values
>in the external datastore (e.g. redis).
>Also, when incremental snapshot get implemented, I want to try.
>
>Regards,
>Hironori
>
>2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi <[hidden email]>:
>> Aljoscha,
>>
>> Thank you for your quick response.
>> Yes, I am using FsStateBackend, so I will try RocksDB backend.
>>
>> Regards,
>> Hironori
>>
>> 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>>> Hi,
>>> I guess you are using the FsStateBackend, is that correct? You could try
>>> using the RocksDB state backend:
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend
>>>
>>> With this, throughput will be lower but the overhead per checkpoint could be
>>> lower. Also, with this most of the file copying necessary for the checkpoint
>>> will be done while data processing keeps running (asynchronous snapshot).
>>>
>>> As to incremental snapshots. I'm afraid this feature is not yet implemented
>>> but we're working on it.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <[hidden email]>
>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I am trying to implement windowed distinct count on a stream. In this
>>>> case, the state
>>>> have to hold all distinct value in the window, so can be large.
>>>>
>>>> In my test, if the state size become about 400MB, checkpointing takes
>>>> 40sec and spends most of Taskmanager's CPU.
>>>> Are there any good way to handle this situation?
>>>>
>>>> Flink document mentions about incremental snapshot, and I am interested in
>>>> it,
>>>> but could not find how to enable it. (not implemented yet?)
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
>>>>
>>>> Regards,
>>>> Hironori
>
Reply | Threaded
Open this post in threaded view
|

Re: Handling large state (incremental snapshot?)

Hironori Ogibayashi
In reply to this post by Aljoscha Krettek
Thank you for your suggestion,

Regarding throughput, actually, there was a bottleneck at the process
which put logs into Kafka.
When I added more process, the throughput increased.

And, also, HyperLogLog seems a good solution in this case. I will try.

Regards,
Hironori

2016-04-07 17:45 GMT+09:00 Aljoscha Krettek <[hidden email]>:

> Ah yes, you're right. With the non-keyed stream it doesn't make a big
> difference because it's only one big state value.
>
> The throughput still seems quite low. Have you ever tried looking at the
> "back pressure" tab on the Flink dashboard. For this I would suggest to
> disable chaining, so that every operator is run in an isolated task:
>
> env.disableOperatorChaining();
>
> On Thu, 7 Apr 2016 at 05:11 Hironori Ogibayashi <[hidden email]>
> wrote:
>>
>> I tried RocksDB, but the result was almost the same.
>>
>> I used the following code and put 2.6million distinct records into Kafka.
>> After processing all records, the state on the HDFS become about 250MB
>> and time needed for
>> the checkpoint was almost 5sec. Processing throughput was
>> FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec
>>
>> ---
>>     env.setStateBackend(new
>> RocksDBStateBackend("hdfs://<hdfs_host>:8020/apps/flink/checkpoints"));
>>
>>     val stream = env
>>       .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new
>> SimpleStringSchema(), properties))
>>       .map(parseJson(_))
>>       .timeWindowAll(Time.of(10, TimeUnit.DAYS))
>>       .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>       // count distinct values
>>       .fold(Set[String]()){(r,i) => { r + i}}
>>       .map{x => (System.currentTimeMillis(), x.size)}
>>       .addSink(new ElasticsearchSink(config, transports, new
>> IndexRequestBuilder[Tuple2[Long, Int]]  {
>>         override def createIndexRequest(element: Tuple2[Long, Int],
>> ctx: RuntimeContext): IndexRequest = {
>>           val json = new HashMap[String, AnyRef]
>>           json.put("@timestamp", new Timestamp(element._1))
>>           json.put("count", element._2: java.lang.Integer)
>>
>> Requests.indexRequest.index("dummy3").`type`("my-type").source(json)
>>         }
>>       }))
>> ---
>>
>> I guess this is because I used non-keyed stream, so I had one state
>> record with a big value (all distinct value).
>> I think copying all 250MB(or more) file to HDFS in every checkpoint
>> will be heavy, so I will try storing the distinct values
>> in the external datastore (e.g. redis).
>> Also, when incremental snapshot get implemented, I want to try.
>>
>> Regards,
>> Hironori
>>
>> 2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi <[hidden email]>:
>> > Aljoscha,
>> >
>> > Thank you for your quick response.
>> > Yes, I am using FsStateBackend, so I will try RocksDB backend.
>> >
>> > Regards,
>> > Hironori
>> >
>> > 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>> >> Hi,
>> >> I guess you are using the FsStateBackend, is that correct? You could
>> >> try
>> >> using the RocksDB state backend:
>> >>
>> >> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend
>> >>
>> >> With this, throughput will be lower but the overhead per checkpoint
>> >> could be
>> >> lower. Also, with this most of the file copying necessary for the
>> >> checkpoint
>> >> will be done while data processing keeps running (asynchronous
>> >> snapshot).
>> >>
>> >> As to incremental snapshots. I'm afraid this feature is not yet
>> >> implemented
>> >> but we're working on it.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <[hidden email]>
>> >> wrote:
>> >>>
>> >>> Hello,
>> >>>
>> >>> I am trying to implement windowed distinct count on a stream. In this
>> >>> case, the state
>> >>> have to hold all distinct value in the window, so can be large.
>> >>>
>> >>> In my test, if the state size become about 400MB, checkpointing takes
>> >>> 40sec and spends most of Taskmanager's CPU.
>> >>> Are there any good way to handle this situation?
>> >>>
>> >>> Flink document mentions about incremental snapshot, and I am
>> >>> interested in
>> >>> it,
>> >>> but could not find how to enable it. (not implemented yet?)
>> >>>
>> >>>
>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
>> >>>
>> >>> Regards,
>> >>> Hironori