Hi,
I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close to 1. I use custom timestamp: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); so I implemented a custom extractor based on: AscendingTimestampExtractor At the beginning I have 5M reads/s and after 15 min I have just 1M read/s then it get worse and worse. Even when I cancel the job, data are still being written in HBase (I did a sink similar to the example - with a cache of 100s of HBase Puts to be a bit more efficient). When I don't put a sink it seems to stay on 1M reads/s. Do you have an idea why ? Here is a bit of code if needed: final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) .keyBy(0) .timeWindow(Time.days(1)); final SingleOutputStreamOperator<Put> puts = ws.apply(new WindowFunction<ANA, Put, Tuple, TimeWindow>() { @Override public void apply(final Tuple key, final TimeWindow window, final Iterable<ANA> input, final Collector<Put> out) throws Exception { final SummaryStatistics summaryStatistics = new SummaryStatistics(); for (final ANA ana : input) { summaryStatistics.addValue(ana.getValue()); } final Put put = buildPut((String) key.getField(0), window.getStart(), summaryStatistics); out.collect(put); } }); And how I started Flink on YARN : flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 -Dtaskmanager.network.numberOfBuffers=4096 Thanks for any feedback! Christophe |
Hi Christophe, where does the backpressure appear? In front of the sink operator or before the window operator?The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window. 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <[hidden email]>:
|
Hi Fabian, Thanks for the help, I will try that. The backpressure was on the source (HBase). Christophe 2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>:
|
OK, this indicates that the operator following the source is a bottleneck. If that's the WindowOperator, it makes sense to try the refactoring of the WindowFunction. 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <[hidden email]>:
|
Thanks for the feedback and sorry that I can't try all this straight away. Is there a way to have a different function than: WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>() I would like to return a HBase Put and not a SummaryStatistics. So something like this: WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() Christophe 2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Hi Christophe,
A fold function has two inputs: The state and a record to update the state with. So you can update the SummaryStatistics (state) with each Put (input). Cheers, Max On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck <[hidden email]> wrote: > Thanks for the feedback and sorry that I can't try all this straight away. > > Is there a way to have a different function than: > WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>() > > I would like to return a HBase Put and not a SummaryStatistics. So something > like this: > WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() > > Christophe > > 2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>: >> >> OK, this indicates that the operator following the source is a bottleneck. >> >> If that's the WindowOperator, it makes sense to try the refactoring of the >> WindowFunction. >> Alternatively, you can try to run that operator with a higher parallelism. >> >> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck >> <[hidden email]>: >>> >>> Hi Fabian, >>> >>> Thanks for the help, I will try that. The backpressure was on the source >>> (HBase). >>> >>> Christophe >>> >>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>: >>>> >>>> Hi Christophe, >>>> >>>> where does the backpressure appear? In front of the sink operator or >>>> before the window operator? >>>> >>>> In any case, I think you can improve your WindowFunction if you convert >>>> parts of it into a FoldFunction<ANA, SummaryStatistics>. >>>> The FoldFunction would take care of the statistics computation and the >>>> WindowFunction would only assemble the result record including extracting >>>> the start time of the window. >>>> >>>> Then you could do: >>>> >>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >>>> YourWindowFunction()); >>>> >>>> This is more efficient because the FoldFunction is eagerly applied when >>>> ever a new element is added to a window. Hence, the window does only hold a >>>> single value (SummaryStatistics) instead of all element added to the window. >>>> In contrast the WindowFunction is called when the window is finally >>>> evaluated. >>>> >>>> Hope this helps, >>>> Fabian >>>> >>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck >>>> <[hidden email]>: >>>>> >>>>> Hi, >>>>> >>>>> I am writing a program to read timeseries from HBase and do some daily >>>>> aggregations (Flink streaming). For now I am just computing some average so >>>>> not very consuming but my HBase read get slower and slower (I have few >>>>> billions of points to read). The back pressure is almost all the time close >>>>> to 1. >>>>> >>>>> I use custom timestamp: >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>> >>>>> so I implemented a custom extractor based on: >>>>> AscendingTimestampExtractor >>>>> >>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M >>>>> read/s then it get worse and worse. Even when I cancel the job, data are >>>>> still being written in HBase (I did a sink similar to the example - with a >>>>> cache of 100s of HBase Puts to be a bit more efficient). >>>>> >>>>> When I don't put a sink it seems to stay on 1M reads/s. >>>>> >>>>> Do you have an idea why ? >>>>> >>>>> Here is a bit of code if needed: >>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) >>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) >>>>> .keyBy(0) >>>>> .timeWindow(Time.days(1)); >>>>> >>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new >>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { >>>>> >>>>> @Override >>>>> public void apply(final Tuple key, final TimeWindow window, final >>>>> Iterable<ANA> input, >>>>> final Collector<Put> out) throws Exception { >>>>> >>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >>>>> for (final ANA ana : input) { >>>>> summaryStatistics.addValue(ana.getValue()); >>>>> } >>>>> final Put put = buildPut((String) key.getField(0), window.getStart(), >>>>> summaryStatistics); >>>>> out.collect(put); >>>>> } >>>>> }); >>>>> >>>>> And how I started Flink on YARN : >>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >>>>> -Dtaskmanager.network.numberOfBuffers=4096 >>>>> >>>>> Thanks for any feedback! >>>>> >>>>> Christophe >>>> >>>> >>> >> > |
In reply to this post by Christophe Salperwyck
Hi, I'm afraid this is currently a shortcoming in the API. There is this open Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We can't fix it before Flink 2.0, though, because we have to keep the API stable on the Flink 1.x release line. Cheers, Aljoscha On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck <[hidden email]> wrote:
|
In reply to this post by Maximilian Michels
Hi Max, In fact the Put would be the output of my WindowFunction. I saw Aljoscha replied, seems I will need to create another intermediate class to handle that. But it is fine. Thx for help! Christophe 2016-06-13 12:25 GMT+02:00 Maximilian Michels <[hidden email]>: Hi Christophe, |
In reply to this post by Aljoscha Krettek
Hi, I vote on this issue and I agree this would be nice to have. Thx! Christophe 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <[hidden email]>:
|
Thanks!
On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck <[hidden email]> wrote: > Hi, > I vote on this issue and I agree this would be nice to have. > > Thx! > Christophe > > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <[hidden email]>: >> >> Hi, >> I'm afraid this is currently a shortcoming in the API. There is this open >> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We >> can't fix it before Flink 2.0, though, because we have to keep the API >> stable on the Flink 1.x release line. >> >> Cheers, >> Aljoscha >> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck >> <[hidden email]> wrote: >>> >>> Thanks for the feedback and sorry that I can't try all this straight >>> away. >>> >>> Is there a way to have a different function than: >>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>() >>> >>> I would like to return a HBase Put and not a SummaryStatistics. So >>> something like this: >>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() >>> >>> Christophe >>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <[hidden email]>: >>>> >>>> OK, this indicates that the operator following the source is a >>>> bottleneck. >>>> >>>> If that's the WindowOperator, it makes sense to try the refactoring of >>>> the WindowFunction. >>>> Alternatively, you can try to run that operator with a higher >>>> parallelism. >>>> >>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck >>>> <[hidden email]>: >>>>> >>>>> Hi Fabian, >>>>> >>>>> Thanks for the help, I will try that. The backpressure was on the >>>>> source (HBase). >>>>> >>>>> Christophe >>>>> >>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <[hidden email]>: >>>>>> >>>>>> Hi Christophe, >>>>>> >>>>>> where does the backpressure appear? In front of the sink operator or >>>>>> before the window operator? >>>>>> >>>>>> In any case, I think you can improve your WindowFunction if you >>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>. >>>>>> The FoldFunction would take care of the statistics computation and the >>>>>> WindowFunction would only assemble the result record including extracting >>>>>> the start time of the window. >>>>>> >>>>>> Then you could do: >>>>>> >>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >>>>>> YourWindowFunction()); >>>>>> >>>>>> This is more efficient because the FoldFunction is eagerly applied >>>>>> when ever a new element is added to a window. Hence, the window does only >>>>>> hold a single value (SummaryStatistics) instead of all element added to the >>>>>> window. In contrast the WindowFunction is called when the window is finally >>>>>> evaluated. >>>>>> >>>>>> Hope this helps, >>>>>> Fabian >>>>>> >>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck >>>>>> <[hidden email]>: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am writing a program to read timeseries from HBase and do some >>>>>>> daily aggregations (Flink streaming). For now I am just computing some >>>>>>> average so not very consuming but my HBase read get slower and slower (I >>>>>>> have few billions of points to read). The back pressure is almost all the >>>>>>> time close to 1. >>>>>>> >>>>>>> I use custom timestamp: >>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>>>> >>>>>>> so I implemented a custom extractor based on: >>>>>>> AscendingTimestampExtractor >>>>>>> >>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M >>>>>>> read/s then it get worse and worse. Even when I cancel the job, data are >>>>>>> still being written in HBase (I did a sink similar to the example - with a >>>>>>> cache of 100s of HBase Puts to be a bit more efficient). >>>>>>> >>>>>>> When I don't put a sink it seems to stay on 1M reads/s. >>>>>>> >>>>>>> Do you have an idea why ? >>>>>>> >>>>>>> Here is a bit of code if needed: >>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) >>>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) >>>>>>> .keyBy(0) >>>>>>> .timeWindow(Time.days(1)); >>>>>>> >>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new >>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { >>>>>>> >>>>>>> @Override >>>>>>> public void apply(final Tuple key, final TimeWindow window, final >>>>>>> Iterable<ANA> input, >>>>>>> final Collector<Put> out) throws Exception { >>>>>>> >>>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >>>>>>> for (final ANA ana : input) { >>>>>>> summaryStatistics.addValue(ana.getValue()); >>>>>>> } >>>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(), >>>>>>> summaryStatistics); >>>>>>> out.collect(put); >>>>>>> } >>>>>>> }); >>>>>>> >>>>>>> And how I started Flink on YARN : >>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >>>>>>> -Dtaskmanager.network.numberOfBuffers=4096 >>>>>>> >>>>>>> Thanks for any feedback! >>>>>>> >>>>>>> Christophe >>>>>> >>>>>> >>>>> >>>> >>> > |
To continue, I implemented the ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction()); It works fine when there is no sink, but when I put an HBase sink it seems that the sink, somehow, blocks the flow. The sink writes very little data into HBase and when I limit my input to just few sensors, it works well. Any idea? final SingleOutputStreamOperator<Aggregate> aggregates = ws .apply( new Aggregate(), new FoldFunction<ANA, Aggregate>() { @Override public Aggregate fold(final Aggregate accumulator, final ANA value) throws Exception { accumulator.addValue(value.getValue()); return accumulator; } }, new WindowFunction<Aggregate, Aggregate, Tuple, TimeWindow>() { @Override public void apply(final Tuple key, final TimeWindow window, final Iterable<Aggregate> input, final Collector<Aggregate> out) throws Exception { for (final Aggregate aggregate : input) { aggregate.setM((String) key.getField(0)); aggregate.setTime(window.getStart()); out.collect(aggregate); } } }); aggregates. setParallelism(10). writeUsingOutputFormat(new OutputFormat<Aggregate>() { private static final long serialVersionUID = 1L; HBaseConnect hBaseConnect; Table table; final int flushSize = 100; List<Put> puts = new ArrayList<>(); @Override public void writeRecord(final Aggregate record) throws IOException { puts.add(record.buildPut()); if (puts.size() == flushSize) { table.put(puts); } } @Override public void open(final int taskNumber, final int numTasks) throws IOException { hBaseConnect = new HBaseConnect(); table = hBaseConnect.getHTable("PERF_TEST"); } @Override public void configure(final Configuration parameters) { // TODO Auto-generated method stub } @Override public void close() throws IOException { //last inserts table.put(puts); table.close(); hBaseConnect.close(); } }); 2016-06-13 13:47 GMT+02:00 Maximilian Michels <[hidden email]>: Thanks! |
Do the backpressure metrics indicate that the sink function is blocking? 2016-06-13 16:58 GMT+02:00 Christophe Salperwyck <[hidden email]>:
|
I would need to restart it to be sure (and when it starts to be stuck, the web interface doesn't give the backpressure anymore), but it seems so. I put a text file as the output and it took 5h to complete: aggregates.writeAsText("hdfs:///user/christophe/flinkHBase"); What is weird is that I have as many lines as in my input if I don't limit the scan (14B rows). If I limit the scan (150M) I have 1 line per hour as expected. I am investigating a bit more right now. Thanks again! Christophe 2016-06-13 18:50 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Free forum by Nabble | Edit this page |