Hi all,
I am trying to read data from HBase and use the windows functions of Flink streaming. I can read my data using the ExecutionEnvironment but not from the StreamExecutionEnvironment. Is that a known issue? Are the inputsplits used in the streaming environment? Here a sample of my code: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @SuppressWarnings("serial") final DataStreamSource<ANA> anaDS = env.createInput(new TableInputFormat<ANA>() { ... } final WindowedStream<ANA, Tuple, TimeWindow> ws = anaDS. assignTimestampsAndWatermarks(new xxxxAssignerWithPunctuatedWatermarks()). keyBy(0). timeWindow(Time.days(30), Time.days(30)); ws.sum(2).printToErr(); env.execute(); The error I get is: Caused by: java.io.IOException: No table result scanner provided! at org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:103) It seems the "Result" is not read for a first time before calling this function. I built a "StreamingTableInputFormat" as a temporary work around but let me know if there is something I did wrong. Thanks for everything, Flink is great! Cheers, Christophe |
From the code it looks like the open method of the TableInputFormat is
never called. What are you doing differently in the StreamingTableInputFormat? – Ufuk On Mon, Jun 6, 2016 at 1:49 PM, Christophe Salperwyck <[hidden email]> wrote: > Hi all, > > I am trying to read data from HBase and use the windows functions of Flink > streaming. I can read my data using the ExecutionEnvironment but not from > the StreamExecutionEnvironment. > > Is that a known issue? > > Are the inputsplits used in the streaming environment? > > Here a sample of my code: > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > @SuppressWarnings("serial") > final DataStreamSource<ANA> anaDS = env.createInput(new > TableInputFormat<ANA>() { > ... > } > > final WindowedStream<ANA, Tuple, TimeWindow> ws = anaDS. > assignTimestampsAndWatermarks(new xxxxAssignerWithPunctuatedWatermarks()). > keyBy(0). > timeWindow(Time.days(30), Time.days(30)); > > ws.sum(2).printToErr(); > env.execute(); > > The error I get is: > Caused by: java.io.IOException: No table result scanner provided! > at > org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:103) > > It seems the "Result" is not read for a first time before calling this > function. > > I built a "StreamingTableInputFormat" as a temporary work around but let me > know if there is something I did wrong. > > Thanks for everything, Flink is great! > > Cheers, > Christophe |
I just did that: public T nextRecord(final T reuse) throws IOException { if (this.rs == null){ // throw new IOException("No table result scanner provided!"); return null; } ... because in the class FileSourceFunction we have: @Override
public void run(SourceContext<OUT> ctx) throws Exception {
while (isRunning) {
OUT nextElement = serializer.createInstance();
nextElement = format.nextRecord(nextElement);
if (nextElement == null && splitIterator.hasNext()) {
format.open(splitIterator.next());
continue;
} else if (nextElement == null) {
break;
}
ctx.collect(nextElement);
}
} (I had to copy TableInputSplit as its constructor is not visible...) 2016-06-06 16:07 GMT+02:00 Ufuk Celebi <[hidden email]>: From the code it looks like the open method of the TableInputFormat is |
Free forum by Nabble | Edit this page |