HBase reads and back pressure

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

HBase reads and back pressure

Christophe Salperwyck
Hi,

I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close to 1.

I use custom timestamp:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

so I implemented a custom extractor based on:
AscendingTimestampExtractor

At the beginning I have 5M reads/s and after 15 min I have just 1M read/s then it get worse and worse. Even when I cancel the job, data are still being written in HBase (I did a sink similar to the example - with a cache of 100s of HBase Puts to be a bit more efficient).

When I don't put a sink it seems to stay on 1M reads/s.

Do you have an idea why ?

Here is a bit of code if needed:
final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
.assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
.keyBy(0)
.timeWindow(Time.days(1));

final SingleOutputStreamOperator<Put> puts = ws.apply(new WindowFunction<ANA, Put, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final Iterable<ANA> input,
final Collector<Put> out) throws Exception {

final SummaryStatistics summaryStatistics = new SummaryStatistics();
for (final ANA ana : input) {
summaryStatistics.addValue(ana.getValue());
}
final Put put = buildPut((String) key.getField(0), window.getStart(), summaryStatistics);
out.collect(put);
}
});

And how I started Flink on YARN :
flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 -Dtaskmanager.network.numberOfBuffers=4096

Thanks for any feedback!

Christophe
Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Fabian Hueske-2
Hi Christophe,

where does the backpressure appear? In front of the sink operator or before the window operator?

In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window.

Then you could do:

ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction());

This is more efficient because the FoldFunction is eagerly applied when ever a new element is added to a window. Hence, the window does only hold a single value (SummaryStatistics) instead of all element added to the window. In contrast the WindowFunction is called when the window is finally evaluated.

Hope this helps,
Fabian

2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <[hidden email]>:
Hi,

I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close to 1.

I use custom timestamp:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

so I implemented a custom extractor based on:
AscendingTimestampExtractor

At the beginning I have 5M reads/s and after 15 min I have just 1M read/s then it get worse and worse. Even when I cancel the job, data are still being written in HBase (I did a sink similar to the example - with a cache of 100s of HBase Puts to be a bit more efficient).

When I don't put a sink it seems to stay on 1M reads/s.

Do you have an idea why ?

Here is a bit of code if needed:
final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
.assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
.keyBy(0)
.timeWindow(Time.days(1));

final SingleOutputStreamOperator<Put> puts = ws.apply(new WindowFunction<ANA, Put, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final Iterable<ANA> input,
final Collector<Put> out) throws Exception {

final SummaryStatistics summaryStatistics = new SummaryStatistics();
for (final ANA ana : input) {
summaryStatistics.addValue(ana.getValue());
}
final Put put = buildPut((String) key.getField(0), window.getStart(), summaryStatistics);
out.collect(put);
}
});

And how I started Flink on YARN :
flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 -Dtaskmanager.network.numberOfBuffers=4096

Thanks for any feedback!

Christophe

Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Christophe Salperwyck
Hi Fabian,

Thanks for the help, I will try that. The backpressure was on the source (HBase).

Christophe

2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Christophe,

where does the backpressure appear? In front of the sink operator or before the window operator?

In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window.

Then you could do:

ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction());

This is more efficient because the FoldFunction is eagerly applied when ever a new element is added to a window. Hence, the window does only hold a single value (SummaryStatistics) instead of all element added to the window. In contrast the WindowFunction is called when the window is finally evaluated.

Hope this helps,
Fabian

2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <[hidden email]>:
Hi,

I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close to 1.

I use custom timestamp:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

so I implemented a custom extractor based on:
AscendingTimestampExtractor

At the beginning I have 5M reads/s and after 15 min I have just 1M read/s then it get worse and worse. Even when I cancel the job, data are still being written in HBase (I did a sink similar to the example - with a cache of 100s of HBase Puts to be a bit more efficient).

When I don't put a sink it seems to stay on 1M reads/s.

Do you have an idea why ?

Here is a bit of code if needed:
final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
.assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
.keyBy(0)
.timeWindow(Time.days(1));

final SingleOutputStreamOperator<Put> puts = ws.apply(new WindowFunction<ANA, Put, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final Iterable<ANA> input,
final Collector<Put> out) throws Exception {

final SummaryStatistics summaryStatistics = new SummaryStatistics();
for (final ANA ana : input) {
summaryStatistics.addValue(ana.getValue());
}
final Put put = buildPut((String) key.getField(0), window.getStart(), summaryStatistics);
out.collect(put);
}
});

And how I started Flink on YARN :
flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 -Dtaskmanager.network.numberOfBuffers=4096

Thanks for any feedback!

Christophe


Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Fabian Hueske-2
OK, this indicates that the operator following the source is a bottleneck.

If that's the WindowOperator, it makes sense to try the refactoring of the WindowFunction.
Alternatively, you can try to run that operator with a higher parallelism.

2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <[hidden email]>:
Hi Fabian,

Thanks for the help, I will try that. The backpressure was on the source (HBase).

Christophe

2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Christophe,

where does the backpressure appear? In front of the sink operator or before the window operator?

In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window.

Then you could do:

ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction());

This is more efficient because the FoldFunction is eagerly applied when ever a new element is added to a window. Hence, the window does only hold a single value (SummaryStatistics) instead of all element added to the window. In contrast the WindowFunction is called when the window is finally evaluated.

Hope this helps,
Fabian

2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <[hidden email]>:
Hi,

I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close to 1.

I use custom timestamp:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

so I implemented a custom extractor based on:
AscendingTimestampExtractor

At the beginning I have 5M reads/s and after 15 min I have just 1M read/s then it get worse and worse. Even when I cancel the job, data are still being written in HBase (I did a sink similar to the example - with a cache of 100s of HBase Puts to be a bit more efficient).

When I don't put a sink it seems to stay on 1M reads/s.

Do you have an idea why ?

Here is a bit of code if needed:
final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
.assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
.keyBy(0)
.timeWindow(Time.days(1));

final SingleOutputStreamOperator<Put> puts = ws.apply(new WindowFunction<ANA, Put, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final Iterable<ANA> input,
final Collector<Put> out) throws Exception {

final SummaryStatistics summaryStatistics = new SummaryStatistics();
for (final ANA ana : input) {
summaryStatistics.addValue(ana.getValue());
}
final Put put = buildPut((String) key.getField(0), window.getStart(), summaryStatistics);
out.collect(put);
}
});

And how I started Flink on YARN :
flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 -Dtaskmanager.network.numberOfBuffers=4096

Thanks for any feedback!

Christophe



Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Christophe Salperwyck
Thanks for the feedback and sorry that I can't try all this straight away.

Is there a way to have a different function than:
WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()

I would like to return a HBase Put and not a SummaryStatistics. So something like this:
WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()

Christophe

2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>:
OK, this indicates that the operator following the source is a bottleneck.

If that's the WindowOperator, it makes sense to try the refactoring of the WindowFunction.
Alternatively, you can try to run that operator with a higher parallelism.

2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <[hidden email]>:
Hi Fabian,

Thanks for the help, I will try that. The backpressure was on the source (HBase).

Christophe

2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Christophe,

where does the backpressure appear? In front of the sink operator or before the window operator?

In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window.

Then you could do:

ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction());

This is more efficient because the FoldFunction is eagerly applied when ever a new element is added to a window. Hence, the window does only hold a single value (SummaryStatistics) instead of all element added to the window. In contrast the WindowFunction is called when the window is finally evaluated.

Hope this helps,
Fabian

2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <[hidden email]>:
Hi,

I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close to 1.

I use custom timestamp:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

so I implemented a custom extractor based on:
AscendingTimestampExtractor

At the beginning I have 5M reads/s and after 15 min I have just 1M read/s then it get worse and worse. Even when I cancel the job, data are still being written in HBase (I did a sink similar to the example - with a cache of 100s of HBase Puts to be a bit more efficient).

When I don't put a sink it seems to stay on 1M reads/s.

Do you have an idea why ?

Here is a bit of code if needed:
final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
.assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
.keyBy(0)
.timeWindow(Time.days(1));

final SingleOutputStreamOperator<Put> puts = ws.apply(new WindowFunction<ANA, Put, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final Iterable<ANA> input,
final Collector<Put> out) throws Exception {

final SummaryStatistics summaryStatistics = new SummaryStatistics();
for (final ANA ana : input) {
summaryStatistics.addValue(ana.getValue());
}
final Put put = buildPut((String) key.getField(0), window.getStart(), summaryStatistics);
out.collect(put);
}
});

And how I started Flink on YARN :
flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 -Dtaskmanager.network.numberOfBuffers=4096

Thanks for any feedback!

Christophe




Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Maximilian Michels
Hi Christophe,

A fold function has two inputs: The state and a record to update the
state with. So you can update the SummaryStatistics (state) with each
Put (input).

Cheers,
Max

On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck
<[hidden email]> wrote:

> Thanks for the feedback and sorry that I can't try all this straight away.
>
> Is there a way to have a different function than:
> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
>
> I would like to return a HBase Put and not a SummaryStatistics. So something
> like this:
> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>
> Christophe
>
> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>:
>>
>> OK, this indicates that the operator following the source is a bottleneck.
>>
>> If that's the WindowOperator, it makes sense to try the refactoring of the
>> WindowFunction.
>> Alternatively, you can try to run that operator with a higher parallelism.
>>
>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>> <[hidden email]>:
>>>
>>> Hi Fabian,
>>>
>>> Thanks for the help, I will try that. The backpressure was on the source
>>> (HBase).
>>>
>>> Christophe
>>>
>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>>
>>>> Hi Christophe,
>>>>
>>>> where does the backpressure appear? In front of the sink operator or
>>>> before the window operator?
>>>>
>>>> In any case, I think you can improve your WindowFunction if you convert
>>>> parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>>> The FoldFunction would take care of the statistics computation and the
>>>> WindowFunction would only assemble the result record including extracting
>>>> the start time of the window.
>>>>
>>>> Then you could do:
>>>>
>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>>> YourWindowFunction());
>>>>
>>>> This is more efficient because the FoldFunction is eagerly applied when
>>>> ever a new element is added to a window. Hence, the window does only hold a
>>>> single value (SummaryStatistics) instead of all element added to the window.
>>>> In contrast the WindowFunction is called when the window is finally
>>>> evaluated.
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>>>> <[hidden email]>:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am writing a program to read timeseries from HBase and do some daily
>>>>> aggregations (Flink streaming). For now I am just computing some average so
>>>>> not very consuming but my HBase read get slower and slower (I have few
>>>>> billions of points to read). The back pressure is almost all the time close
>>>>> to 1.
>>>>>
>>>>> I use custom timestamp:
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>
>>>>> so I implemented a custom extractor based on:
>>>>> AscendingTimestampExtractor
>>>>>
>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>>>> still being written in HBase (I did a sink similar to the example - with a
>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>>>
>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>>>
>>>>> Do you have an idea why ?
>>>>>
>>>>> Here is a bit of code if needed:
>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>>>> .keyBy(0)
>>>>> .timeWindow(Time.days(1));
>>>>>
>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>>>
>>>>> @Override
>>>>> public void apply(final Tuple key, final TimeWindow window, final
>>>>> Iterable<ANA> input,
>>>>> final Collector<Put> out) throws Exception {
>>>>>
>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>>>> for (final ANA ana : input) {
>>>>> summaryStatistics.addValue(ana.getValue());
>>>>> }
>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>>>> summaryStatistics);
>>>>> out.collect(put);
>>>>> }
>>>>> });
>>>>>
>>>>> And how I started Flink on YARN :
>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>>>
>>>>> Thanks for any feedback!
>>>>>
>>>>> Christophe
>>>>
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Aljoscha Krettek
In reply to this post by Christophe Salperwyck
Hi,
I'm afraid this is currently a shortcoming in the API. There is this open Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We can't fix it before Flink 2.0, though, because we have to keep the API stable on the Flink 1.x release line.

Cheers,
Aljoscha

On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck <[hidden email]> wrote:
Thanks for the feedback and sorry that I can't try all this straight away.

Is there a way to have a different function than:
WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()

I would like to return a HBase Put and not a SummaryStatistics. So something like this:
WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()

Christophe

2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>:
OK, this indicates that the operator following the source is a bottleneck.

If that's the WindowOperator, it makes sense to try the refactoring of the WindowFunction.
Alternatively, you can try to run that operator with a higher parallelism.

2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <[hidden email]>:
Hi Fabian,

Thanks for the help, I will try that. The backpressure was on the source (HBase).

Christophe

2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Christophe,

where does the backpressure appear? In front of the sink operator or before the window operator?

In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window.

Then you could do:

ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction());

This is more efficient because the FoldFunction is eagerly applied when ever a new element is added to a window. Hence, the window does only hold a single value (SummaryStatistics) instead of all element added to the window. In contrast the WindowFunction is called when the window is finally evaluated.

Hope this helps,
Fabian

2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <[hidden email]>:
Hi,

I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close to 1.

I use custom timestamp:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

so I implemented a custom extractor based on:
AscendingTimestampExtractor

At the beginning I have 5M reads/s and after 15 min I have just 1M read/s then it get worse and worse. Even when I cancel the job, data are still being written in HBase (I did a sink similar to the example - with a cache of 100s of HBase Puts to be a bit more efficient).

When I don't put a sink it seems to stay on 1M reads/s.

Do you have an idea why ?

Here is a bit of code if needed:
final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
.assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
.keyBy(0)
.timeWindow(Time.days(1));

final SingleOutputStreamOperator<Put> puts = ws.apply(new WindowFunction<ANA, Put, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final Iterable<ANA> input,
final Collector<Put> out) throws Exception {

final SummaryStatistics summaryStatistics = new SummaryStatistics();
for (final ANA ana : input) {
summaryStatistics.addValue(ana.getValue());
}
final Put put = buildPut((String) key.getField(0), window.getStart(), summaryStatistics);
out.collect(put);
}
});

And how I started Flink on YARN :
flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 -Dtaskmanager.network.numberOfBuffers=4096

Thanks for any feedback!

Christophe




Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Christophe Salperwyck
In reply to this post by Maximilian Michels
Hi Max,

In fact the Put would be the output of my WindowFunction. I saw Aljoscha replied, seems I will need to create another intermediate class to handle that. But it is fine.

Thx for help!
Christophe

2016-06-13 12:25 GMT+02:00 Maximilian Michels <[hidden email]>:
Hi Christophe,

A fold function has two inputs: The state and a record to update the
state with. So you can update the SummaryStatistics (state) with each
Put (input).

Cheers,
Max

On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck
<[hidden email]> wrote:
> Thanks for the feedback and sorry that I can't try all this straight away.
>
> Is there a way to have a different function than:
> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
>
> I would like to return a HBase Put and not a SummaryStatistics. So something
> like this:
> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>
> Christophe
>
> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>:
>>
>> OK, this indicates that the operator following the source is a bottleneck.
>>
>> If that's the WindowOperator, it makes sense to try the refactoring of the
>> WindowFunction.
>> Alternatively, you can try to run that operator with a higher parallelism.
>>
>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>> <[hidden email]>:
>>>
>>> Hi Fabian,
>>>
>>> Thanks for the help, I will try that. The backpressure was on the source
>>> (HBase).
>>>
>>> Christophe
>>>
>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>>
>>>> Hi Christophe,
>>>>
>>>> where does the backpressure appear? In front of the sink operator or
>>>> before the window operator?
>>>>
>>>> In any case, I think you can improve your WindowFunction if you convert
>>>> parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>>> The FoldFunction would take care of the statistics computation and the
>>>> WindowFunction would only assemble the result record including extracting
>>>> the start time of the window.
>>>>
>>>> Then you could do:
>>>>
>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>>> YourWindowFunction());
>>>>
>>>> This is more efficient because the FoldFunction is eagerly applied when
>>>> ever a new element is added to a window. Hence, the window does only hold a
>>>> single value (SummaryStatistics) instead of all element added to the window.
>>>> In contrast the WindowFunction is called when the window is finally
>>>> evaluated.
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>>>> <[hidden email]>:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am writing a program to read timeseries from HBase and do some daily
>>>>> aggregations (Flink streaming). For now I am just computing some average so
>>>>> not very consuming but my HBase read get slower and slower (I have few
>>>>> billions of points to read). The back pressure is almost all the time close
>>>>> to 1.
>>>>>
>>>>> I use custom timestamp:
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>
>>>>> so I implemented a custom extractor based on:
>>>>> AscendingTimestampExtractor
>>>>>
>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>>>> still being written in HBase (I did a sink similar to the example - with a
>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>>>
>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>>>
>>>>> Do you have an idea why ?
>>>>>
>>>>> Here is a bit of code if needed:
>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>>>> .keyBy(0)
>>>>> .timeWindow(Time.days(1));
>>>>>
>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>>>
>>>>> @Override
>>>>> public void apply(final Tuple key, final TimeWindow window, final
>>>>> Iterable<ANA> input,
>>>>> final Collector<Put> out) throws Exception {
>>>>>
>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>>>> for (final ANA ana : input) {
>>>>> summaryStatistics.addValue(ana.getValue());
>>>>> }
>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>>>> summaryStatistics);
>>>>> out.collect(put);
>>>>> }
>>>>> });
>>>>>
>>>>> And how I started Flink on YARN :
>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>>>
>>>>> Thanks for any feedback!
>>>>>
>>>>> Christophe
>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Christophe Salperwyck
In reply to this post by Aljoscha Krettek
Hi, 
I vote on this issue and I agree this would be nice to have.

Thx!
Christophe

2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <[hidden email]>:
Hi,
I'm afraid this is currently a shortcoming in the API. There is this open Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We can't fix it before Flink 2.0, though, because we have to keep the API stable on the Flink 1.x release line.

Cheers,
Aljoscha

On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck <[hidden email]> wrote:
Thanks for the feedback and sorry that I can't try all this straight away.

Is there a way to have a different function than:
WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()

I would like to return a HBase Put and not a SummaryStatistics. So something like this:
WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()

Christophe

2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>:
OK, this indicates that the operator following the source is a bottleneck.

If that's the WindowOperator, it makes sense to try the refactoring of the WindowFunction.
Alternatively, you can try to run that operator with a higher parallelism.

2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <[hidden email]>:
Hi Fabian,

Thanks for the help, I will try that. The backpressure was on the source (HBase).

Christophe

2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Christophe,

where does the backpressure appear? In front of the sink operator or before the window operator?

In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window.

Then you could do:

ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction());

This is more efficient because the FoldFunction is eagerly applied when ever a new element is added to a window. Hence, the window does only hold a single value (SummaryStatistics) instead of all element added to the window. In contrast the WindowFunction is called when the window is finally evaluated.

Hope this helps,
Fabian

2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <[hidden email]>:
Hi,

I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close to 1.

I use custom timestamp:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

so I implemented a custom extractor based on:
AscendingTimestampExtractor

At the beginning I have 5M reads/s and after 15 min I have just 1M read/s then it get worse and worse. Even when I cancel the job, data are still being written in HBase (I did a sink similar to the example - with a cache of 100s of HBase Puts to be a bit more efficient).

When I don't put a sink it seems to stay on 1M reads/s.

Do you have an idea why ?

Here is a bit of code if needed:
final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
.assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
.keyBy(0)
.timeWindow(Time.days(1));

final SingleOutputStreamOperator<Put> puts = ws.apply(new WindowFunction<ANA, Put, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final Iterable<ANA> input,
final Collector<Put> out) throws Exception {

final SummaryStatistics summaryStatistics = new SummaryStatistics();
for (final ANA ana : input) {
summaryStatistics.addValue(ana.getValue());
}
final Put put = buildPut((String) key.getField(0), window.getStart(), summaryStatistics);
out.collect(put);
}
});

And how I started Flink on YARN :
flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 -Dtaskmanager.network.numberOfBuffers=4096

Thanks for any feedback!

Christophe





Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Maximilian Michels
Thanks!

On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
<[hidden email]> wrote:

> Hi,
> I vote on this issue and I agree this would be nice to have.
>
> Thx!
> Christophe
>
> 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <[hidden email]>:
>>
>> Hi,
>> I'm afraid this is currently a shortcoming in the API. There is this open
>> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We
>> can't fix it before Flink 2.0, though, because we have to keep the API
>> stable on the Flink 1.x release line.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>> <[hidden email]> wrote:
>>>
>>> Thanks for the feedback and sorry that I can't try all this straight
>>> away.
>>>
>>> Is there a way to have a different function than:
>>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
>>>
>>> I would like to return a HBase Put and not a SummaryStatistics. So
>>> something like this:
>>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>>>
>>> Christophe
>>>
>>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>>
>>>> OK, this indicates that the operator following the source is a
>>>> bottleneck.
>>>>
>>>> If that's the WindowOperator, it makes sense to try the refactoring of
>>>> the WindowFunction.
>>>> Alternatively, you can try to run that operator with a higher
>>>> parallelism.
>>>>
>>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>>>> <[hidden email]>:
>>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for the help, I will try that. The backpressure was on the
>>>>> source (HBase).
>>>>>
>>>>> Christophe
>>>>>
>>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>>>>
>>>>>> Hi Christophe,
>>>>>>
>>>>>> where does the backpressure appear? In front of the sink operator or
>>>>>> before the window operator?
>>>>>>
>>>>>> In any case, I think you can improve your WindowFunction if you
>>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>>>>> The FoldFunction would take care of the statistics computation and the
>>>>>> WindowFunction would only assemble the result record including extracting
>>>>>> the start time of the window.
>>>>>>
>>>>>> Then you could do:
>>>>>>
>>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>>>>> YourWindowFunction());
>>>>>>
>>>>>> This is more efficient because the FoldFunction is eagerly applied
>>>>>> when ever a new element is added to a window. Hence, the window does only
>>>>>> hold a single value (SummaryStatistics) instead of all element added to the
>>>>>> window. In contrast the WindowFunction is called when the window is finally
>>>>>> evaluated.
>>>>>>
>>>>>> Hope this helps,
>>>>>> Fabian
>>>>>>
>>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>>>>>> <[hidden email]>:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am writing a program to read timeseries from HBase and do some
>>>>>>> daily aggregations (Flink streaming). For now I am just computing some
>>>>>>> average so not very consuming but my HBase read get slower and slower (I
>>>>>>> have few billions of points to read). The back pressure is almost all the
>>>>>>> time close to 1.
>>>>>>>
>>>>>>> I use custom timestamp:
>>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>>>
>>>>>>> so I implemented a custom extractor based on:
>>>>>>> AscendingTimestampExtractor
>>>>>>>
>>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>>>>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>>>>>> still being written in HBase (I did a sink similar to the example - with a
>>>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>>>>>
>>>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>>>>>
>>>>>>> Do you have an idea why ?
>>>>>>>
>>>>>>> Here is a bit of code if needed:
>>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>>>>>> .keyBy(0)
>>>>>>> .timeWindow(Time.days(1));
>>>>>>>
>>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>>>>>
>>>>>>> @Override
>>>>>>> public void apply(final Tuple key, final TimeWindow window, final
>>>>>>> Iterable<ANA> input,
>>>>>>> final Collector<Put> out) throws Exception {
>>>>>>>
>>>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>>>>>> for (final ANA ana : input) {
>>>>>>> summaryStatistics.addValue(ana.getValue());
>>>>>>> }
>>>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>>>>>> summaryStatistics);
>>>>>>> out.collect(put);
>>>>>>> }
>>>>>>> });
>>>>>>>
>>>>>>> And how I started Flink on YARN :
>>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>>>>>
>>>>>>> Thanks for any feedback!
>>>>>>>
>>>>>>> Christophe
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Christophe Salperwyck
To continue, I implemented the ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction());

It works fine when there is no sink, but when I put an HBase sink it seems that the sink, somehow, blocks the flow. The sink writes very little data into HBase and when I limit my input to just few sensors, it works well. Any idea?

final SingleOutputStreamOperator<Aggregate> aggregates = ws
.apply(
new Aggregate(), 
new FoldFunction<ANA, Aggregate>() {

@Override
public Aggregate fold(final Aggregate accumulator, final ANA value) throws Exception {
accumulator.addValue(value.getValue());
return accumulator;
}
},
new WindowFunction<Aggregate, Aggregate, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final Iterable<Aggregate> input,
final Collector<Aggregate> out) throws Exception {
for (final Aggregate aggregate : input) {
aggregate.setM((String) key.getField(0));
aggregate.setTime(window.getStart());
out.collect(aggregate);
}
}
});
aggregates.
setParallelism(10).
writeUsingOutputFormat(new OutputFormat<Aggregate>() {
private static final long serialVersionUID = 1L;
HBaseConnect hBaseConnect;
Table table;
final int flushSize = 100;
List<Put> puts = new ArrayList<>();
@Override
public void writeRecord(final Aggregate record) throws IOException {
puts.add(record.buildPut());
if (puts.size() == flushSize) {
table.put(puts);
}
}
@Override
public void open(final int taskNumber, final int numTasks) throws IOException {
hBaseConnect = new HBaseConnect();
table = hBaseConnect.getHTable("PERF_TEST");
}
@Override
public void configure(final Configuration parameters) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
//last inserts
table.put(puts);
table.close();
hBaseConnect.close();
}
});

2016-06-13 13:47 GMT+02:00 Maximilian Michels <[hidden email]>:
Thanks!

On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
<[hidden email]> wrote:
> Hi,
> I vote on this issue and I agree this would be nice to have.
>
> Thx!
> Christophe
>
> 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <[hidden email]>:
>>
>> Hi,
>> I'm afraid this is currently a shortcoming in the API. There is this open
>> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We
>> can't fix it before Flink 2.0, though, because we have to keep the API
>> stable on the Flink 1.x release line.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>> <[hidden email]> wrote:
>>>
>>> Thanks for the feedback and sorry that I can't try all this straight
>>> away.
>>>
>>> Is there a way to have a different function than:
>>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
>>>
>>> I would like to return a HBase Put and not a SummaryStatistics. So
>>> something like this:
>>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>>>
>>> Christophe
>>>
>>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>>
>>>> OK, this indicates that the operator following the source is a
>>>> bottleneck.
>>>>
>>>> If that's the WindowOperator, it makes sense to try the refactoring of
>>>> the WindowFunction.
>>>> Alternatively, you can try to run that operator with a higher
>>>> parallelism.
>>>>
>>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>>>> <[hidden email]>:
>>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for the help, I will try that. The backpressure was on the
>>>>> source (HBase).
>>>>>
>>>>> Christophe
>>>>>
>>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>>>>
>>>>>> Hi Christophe,
>>>>>>
>>>>>> where does the backpressure appear? In front of the sink operator or
>>>>>> before the window operator?
>>>>>>
>>>>>> In any case, I think you can improve your WindowFunction if you
>>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>>>>> The FoldFunction would take care of the statistics computation and the
>>>>>> WindowFunction would only assemble the result record including extracting
>>>>>> the start time of the window.
>>>>>>
>>>>>> Then you could do:
>>>>>>
>>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>>>>> YourWindowFunction());
>>>>>>
>>>>>> This is more efficient because the FoldFunction is eagerly applied
>>>>>> when ever a new element is added to a window. Hence, the window does only
>>>>>> hold a single value (SummaryStatistics) instead of all element added to the
>>>>>> window. In contrast the WindowFunction is called when the window is finally
>>>>>> evaluated.
>>>>>>
>>>>>> Hope this helps,
>>>>>> Fabian
>>>>>>
>>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>>>>>> <[hidden email]>:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am writing a program to read timeseries from HBase and do some
>>>>>>> daily aggregations (Flink streaming). For now I am just computing some
>>>>>>> average so not very consuming but my HBase read get slower and slower (I
>>>>>>> have few billions of points to read). The back pressure is almost all the
>>>>>>> time close to 1.
>>>>>>>
>>>>>>> I use custom timestamp:
>>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>>>
>>>>>>> so I implemented a custom extractor based on:
>>>>>>> AscendingTimestampExtractor
>>>>>>>
>>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>>>>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>>>>>> still being written in HBase (I did a sink similar to the example - with a
>>>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>>>>>
>>>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>>>>>
>>>>>>> Do you have an idea why ?
>>>>>>>
>>>>>>> Here is a bit of code if needed:
>>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>>>>>> .keyBy(0)
>>>>>>> .timeWindow(Time.days(1));
>>>>>>>
>>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>>>>>
>>>>>>> @Override
>>>>>>> public void apply(final Tuple key, final TimeWindow window, final
>>>>>>> Iterable<ANA> input,
>>>>>>> final Collector<Put> out) throws Exception {
>>>>>>>
>>>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>>>>>> for (final ANA ana : input) {
>>>>>>> summaryStatistics.addValue(ana.getValue());
>>>>>>> }
>>>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>>>>>> summaryStatistics);
>>>>>>> out.collect(put);
>>>>>>> }
>>>>>>> });
>>>>>>>
>>>>>>> And how I started Flink on YARN :
>>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>>>>>
>>>>>>> Thanks for any feedback!
>>>>>>>
>>>>>>> Christophe
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Fabian Hueske-2
Do the backpressure metrics indicate that the sink function is blocking?

2016-06-13 16:58 GMT+02:00 Christophe Salperwyck <[hidden email]>:
To continue, I implemented the ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction());

It works fine when there is no sink, but when I put an HBase sink it seems that the sink, somehow, blocks the flow. The sink writes very little data into HBase and when I limit my input to just few sensors, it works well. Any idea?

final SingleOutputStreamOperator<Aggregate> aggregates = ws
.apply(
new Aggregate(), 
new FoldFunction<ANA, Aggregate>() {

@Override
public Aggregate fold(final Aggregate accumulator, final ANA value) throws Exception {
accumulator.addValue(value.getValue());
return accumulator;
}
},
new WindowFunction<Aggregate, Aggregate, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final Iterable<Aggregate> input,
final Collector<Aggregate> out) throws Exception {
for (final Aggregate aggregate : input) {
aggregate.setM((String) key.getField(0));
aggregate.setTime(window.getStart());
out.collect(aggregate);
}
}
});
aggregates.
setParallelism(10).
writeUsingOutputFormat(new OutputFormat<Aggregate>() {
private static final long serialVersionUID = 1L;
HBaseConnect hBaseConnect;
Table table;
final int flushSize = 100;
List<Put> puts = new ArrayList<>();
@Override
public void writeRecord(final Aggregate record) throws IOException {
puts.add(record.buildPut());
if (puts.size() == flushSize) {
table.put(puts);
}
}
@Override
public void open(final int taskNumber, final int numTasks) throws IOException {
hBaseConnect = new HBaseConnect();
table = hBaseConnect.getHTable("PERF_TEST");
}
@Override
public void configure(final Configuration parameters) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
//last inserts
table.put(puts);
table.close();
hBaseConnect.close();
}
});

2016-06-13 13:47 GMT+02:00 Maximilian Michels <[hidden email]>:
Thanks!

On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
<[hidden email]> wrote:
> Hi,
> I vote on this issue and I agree this would be nice to have.
>
> Thx!
> Christophe
>
> 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <[hidden email]>:
>>
>> Hi,
>> I'm afraid this is currently a shortcoming in the API. There is this open
>> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We
>> can't fix it before Flink 2.0, though, because we have to keep the API
>> stable on the Flink 1.x release line.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>> <[hidden email]> wrote:
>>>
>>> Thanks for the feedback and sorry that I can't try all this straight
>>> away.
>>>
>>> Is there a way to have a different function than:
>>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
>>>
>>> I would like to return a HBase Put and not a SummaryStatistics. So
>>> something like this:
>>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>>>
>>> Christophe
>>>
>>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>>
>>>> OK, this indicates that the operator following the source is a
>>>> bottleneck.
>>>>
>>>> If that's the WindowOperator, it makes sense to try the refactoring of
>>>> the WindowFunction.
>>>> Alternatively, you can try to run that operator with a higher
>>>> parallelism.
>>>>
>>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>>>> <[hidden email]>:
>>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for the help, I will try that. The backpressure was on the
>>>>> source (HBase).
>>>>>
>>>>> Christophe
>>>>>
>>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>>>>
>>>>>> Hi Christophe,
>>>>>>
>>>>>> where does the backpressure appear? In front of the sink operator or
>>>>>> before the window operator?
>>>>>>
>>>>>> In any case, I think you can improve your WindowFunction if you
>>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>>>>> The FoldFunction would take care of the statistics computation and the
>>>>>> WindowFunction would only assemble the result record including extracting
>>>>>> the start time of the window.
>>>>>>
>>>>>> Then you could do:
>>>>>>
>>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>>>>> YourWindowFunction());
>>>>>>
>>>>>> This is more efficient because the FoldFunction is eagerly applied
>>>>>> when ever a new element is added to a window. Hence, the window does only
>>>>>> hold a single value (SummaryStatistics) instead of all element added to the
>>>>>> window. In contrast the WindowFunction is called when the window is finally
>>>>>> evaluated.
>>>>>>
>>>>>> Hope this helps,
>>>>>> Fabian
>>>>>>
>>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>>>>>> <[hidden email]>:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am writing a program to read timeseries from HBase and do some
>>>>>>> daily aggregations (Flink streaming). For now I am just computing some
>>>>>>> average so not very consuming but my HBase read get slower and slower (I
>>>>>>> have few billions of points to read). The back pressure is almost all the
>>>>>>> time close to 1.
>>>>>>>
>>>>>>> I use custom timestamp:
>>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>>>
>>>>>>> so I implemented a custom extractor based on:
>>>>>>> AscendingTimestampExtractor
>>>>>>>
>>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>>>>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>>>>>> still being written in HBase (I did a sink similar to the example - with a
>>>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>>>>>
>>>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>>>>>
>>>>>>> Do you have an idea why ?
>>>>>>>
>>>>>>> Here is a bit of code if needed:
>>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>>>>>> .keyBy(0)
>>>>>>> .timeWindow(Time.days(1));
>>>>>>>
>>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>>>>>
>>>>>>> @Override
>>>>>>> public void apply(final Tuple key, final TimeWindow window, final
>>>>>>> Iterable<ANA> input,
>>>>>>> final Collector<Put> out) throws Exception {
>>>>>>>
>>>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>>>>>> for (final ANA ana : input) {
>>>>>>> summaryStatistics.addValue(ana.getValue());
>>>>>>> }
>>>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>>>>>> summaryStatistics);
>>>>>>> out.collect(put);
>>>>>>> }
>>>>>>> });
>>>>>>>
>>>>>>> And how I started Flink on YARN :
>>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>>>>>
>>>>>>> Thanks for any feedback!
>>>>>>>
>>>>>>> Christophe
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>


Reply | Threaded
Open this post in threaded view
|

Re: HBase reads and back pressure

Christophe Salperwyck
I would need to restart it to be sure (and when it starts to be stuck, the web interface doesn't give the backpressure anymore), but it seems so. I put a text file as the output and it took 5h to complete:
aggregates.writeAsText("hdfs:///user/christophe/flinkHBase");

What is weird is that I have as many lines as in my input if I don't limit the scan (14B rows). If I limit the scan (150M) I have 1 line per hour as expected. I am investigating a bit more right now.

Thanks again!
Christophe

2016-06-13 18:50 GMT+02:00 Fabian Hueske <[hidden email]>:
Do the backpressure metrics indicate that the sink function is blocking?

2016-06-13 16:58 GMT+02:00 Christophe Salperwyck <[hidden email]>:
To continue, I implemented the ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction());

It works fine when there is no sink, but when I put an HBase sink it seems that the sink, somehow, blocks the flow. The sink writes very little data into HBase and when I limit my input to just few sensors, it works well. Any idea?

final SingleOutputStreamOperator<Aggregate> aggregates = ws
.apply(
new Aggregate(), 
new FoldFunction<ANA, Aggregate>() {

@Override
public Aggregate fold(final Aggregate accumulator, final ANA value) throws Exception {
accumulator.addValue(value.getValue());
return accumulator;
}
},
new WindowFunction<Aggregate, Aggregate, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final Iterable<Aggregate> input,
final Collector<Aggregate> out) throws Exception {
for (final Aggregate aggregate : input) {
aggregate.setM((String) key.getField(0));
aggregate.setTime(window.getStart());
out.collect(aggregate);
}
}
});
aggregates.
setParallelism(10).
writeUsingOutputFormat(new OutputFormat<Aggregate>() {
private static final long serialVersionUID = 1L;
HBaseConnect hBaseConnect;
Table table;
final int flushSize = 100;
List<Put> puts = new ArrayList<>();
@Override
public void writeRecord(final Aggregate record) throws IOException {
puts.add(record.buildPut());
if (puts.size() == flushSize) {
table.put(puts);
}
}
@Override
public void open(final int taskNumber, final int numTasks) throws IOException {
hBaseConnect = new HBaseConnect();
table = hBaseConnect.getHTable("PERF_TEST");
}
@Override
public void configure(final Configuration parameters) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
//last inserts
table.put(puts);
table.close();
hBaseConnect.close();
}
});

2016-06-13 13:47 GMT+02:00 Maximilian Michels <[hidden email]>:
Thanks!

On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
<[hidden email]> wrote:
> Hi,
> I vote on this issue and I agree this would be nice to have.
>
> Thx!
> Christophe
>
> 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <[hidden email]>:
>>
>> Hi,
>> I'm afraid this is currently a shortcoming in the API. There is this open
>> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We
>> can't fix it before Flink 2.0, though, because we have to keep the API
>> stable on the Flink 1.x release line.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>> <[hidden email]> wrote:
>>>
>>> Thanks for the feedback and sorry that I can't try all this straight
>>> away.
>>>
>>> Is there a way to have a different function than:
>>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
>>>
>>> I would like to return a HBase Put and not a SummaryStatistics. So
>>> something like this:
>>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>>>
>>> Christophe
>>>
>>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>>
>>>> OK, this indicates that the operator following the source is a
>>>> bottleneck.
>>>>
>>>> If that's the WindowOperator, it makes sense to try the refactoring of
>>>> the WindowFunction.
>>>> Alternatively, you can try to run that operator with a higher
>>>> parallelism.
>>>>
>>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>>>> <[hidden email]>:
>>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for the help, I will try that. The backpressure was on the
>>>>> source (HBase).
>>>>>
>>>>> Christophe
>>>>>
>>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>>>>
>>>>>> Hi Christophe,
>>>>>>
>>>>>> where does the backpressure appear? In front of the sink operator or
>>>>>> before the window operator?
>>>>>>
>>>>>> In any case, I think you can improve your WindowFunction if you
>>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>>>>> The FoldFunction would take care of the statistics computation and the
>>>>>> WindowFunction would only assemble the result record including extracting
>>>>>> the start time of the window.
>>>>>>
>>>>>> Then you could do:
>>>>>>
>>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>>>>> YourWindowFunction());
>>>>>>
>>>>>> This is more efficient because the FoldFunction is eagerly applied
>>>>>> when ever a new element is added to a window. Hence, the window does only
>>>>>> hold a single value (SummaryStatistics) instead of all element added to the
>>>>>> window. In contrast the WindowFunction is called when the window is finally
>>>>>> evaluated.
>>>>>>
>>>>>> Hope this helps,
>>>>>> Fabian
>>>>>>
>>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>>>>>> <[hidden email]>:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am writing a program to read timeseries from HBase and do some
>>>>>>> daily aggregations (Flink streaming). For now I am just computing some
>>>>>>> average so not very consuming but my HBase read get slower and slower (I
>>>>>>> have few billions of points to read). The back pressure is almost all the
>>>>>>> time close to 1.
>>>>>>>
>>>>>>> I use custom timestamp:
>>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>>>
>>>>>>> so I implemented a custom extractor based on:
>>>>>>> AscendingTimestampExtractor
>>>>>>>
>>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>>>>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>>>>>> still being written in HBase (I did a sink similar to the example - with a
>>>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>>>>>
>>>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>>>>>
>>>>>>> Do you have an idea why ?
>>>>>>>
>>>>>>> Here is a bit of code if needed:
>>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>>>>>> .keyBy(0)
>>>>>>> .timeWindow(Time.days(1));
>>>>>>>
>>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>>>>>
>>>>>>> @Override
>>>>>>> public void apply(final Tuple key, final TimeWindow window, final
>>>>>>> Iterable<ANA> input,
>>>>>>> final Collector<Put> out) throws Exception {
>>>>>>>
>>>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>>>>>> for (final ANA ana : input) {
>>>>>>> summaryStatistics.addValue(ana.getValue());
>>>>>>> }
>>>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>>>>>> summaryStatistics);
>>>>>>> out.collect(put);
>>>>>>> }
>>>>>>> });
>>>>>>>
>>>>>>> And how I started Flink on YARN :
>>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>>>>>
>>>>>>> Thanks for any feedback!
>>>>>>>
>>>>>>> Christophe
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>