HBase Input Format for streaming

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

HBase Input Format for streaming

Christophe Salperwyck
Hi all,

I am trying to read data from HBase and use the windows functions of Flink streaming. I can read my data using the ExecutionEnvironment but not from the StreamExecutionEnvironment.

Is that a known issue? 

Are the inputsplits used in the streaming environment?

Here a sample of my code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

@SuppressWarnings("serial")
final DataStreamSource<ANA> anaDS = env.createInput(new TableInputFormat<ANA>() {
...
}

final WindowedStream<ANA, Tuple, TimeWindow> ws = anaDS.
assignTimestampsAndWatermarks(new xxxxAssignerWithPunctuatedWatermarks()).
keyBy(0).
timeWindow(Time.days(30), Time.days(30));

ws.sum(2).printToErr();
env.execute();

The error I get is:
Caused by: java.io.IOException: No table result scanner provided!
at org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:103)

It seems the "Result" is not read for a first time before calling this function.

I built a "StreamingTableInputFormat" as a temporary work around but let me know if there is something I did wrong.

Thanks for everything, Flink is great!

Cheers,
Christophe
Reply | Threaded
Open this post in threaded view
|

Re: HBase Input Format for streaming

Ufuk Celebi
From the code it looks like the open method of the TableInputFormat is
never called. What are you doing differently in the
StreamingTableInputFormat?

– Ufuk


On Mon, Jun 6, 2016 at 1:49 PM, Christophe Salperwyck
<[hidden email]> wrote:

> Hi all,
>
> I am trying to read data from HBase and use the windows functions of Flink
> streaming. I can read my data using the ExecutionEnvironment but not from
> the StreamExecutionEnvironment.
>
> Is that a known issue?
>
> Are the inputsplits used in the streaming environment?
>
> Here a sample of my code:
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> @SuppressWarnings("serial")
> final DataStreamSource<ANA> anaDS = env.createInput(new
> TableInputFormat<ANA>() {
> ...
> }
>
> final WindowedStream<ANA, Tuple, TimeWindow> ws = anaDS.
> assignTimestampsAndWatermarks(new xxxxAssignerWithPunctuatedWatermarks()).
> keyBy(0).
> timeWindow(Time.days(30), Time.days(30));
>
> ws.sum(2).printToErr();
> env.execute();
>
> The error I get is:
> Caused by: java.io.IOException: No table result scanner provided!
> at
> org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:103)
>
> It seems the "Result" is not read for a first time before calling this
> function.
>
> I built a "StreamingTableInputFormat" as a temporary work around but let me
> know if there is something I did wrong.
>
> Thanks for everything, Flink is great!
>
> Cheers,
> Christophe
Reply | Threaded
Open this post in threaded view
|

Re: HBase Input Format for streaming

Christophe Salperwyck
I just did that:

public T nextRecord(final T reuse) throws IOException {
if (this.rs == null){
// throw new IOException("No table result scanner provided!");
return null;
}
...

because in the class FileSourceFunction we have:

 @Override public void run(SourceContext<OUT> ctx) throws Exception { while (isRunning) { OUT nextElement = serializer.createInstance(); nextElement = format.nextRecord(nextElement); if (nextElement == null && splitIterator.hasNext()) { format.open(splitIterator.next()); continue; } else if (nextElement == null) { break; } ctx.collect(nextElement); } }

(I had to copy TableInputSplit as its constructor is not visible...)


2016-06-06 16:07 GMT+02:00 Ufuk Celebi <[hidden email]>:
From the code it looks like the open method of the TableInputFormat is
never called. What are you doing differently in the
StreamingTableInputFormat?

– Ufuk


On Mon, Jun 6, 2016 at 1:49 PM, Christophe Salperwyck
<[hidden email]> wrote:
> Hi all,
>
> I am trying to read data from HBase and use the windows functions of Flink
> streaming. I can read my data using the ExecutionEnvironment but not from
> the StreamExecutionEnvironment.
>
> Is that a known issue?
>
> Are the inputsplits used in the streaming environment?
>
> Here a sample of my code:
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> @SuppressWarnings("serial")
> final DataStreamSource<ANA> anaDS = env.createInput(new
> TableInputFormat<ANA>() {
> ...
> }
>
> final WindowedStream<ANA, Tuple, TimeWindow> ws = anaDS.
> assignTimestampsAndWatermarks(new xxxxAssignerWithPunctuatedWatermarks()).
> keyBy(0).
> timeWindow(Time.days(30), Time.days(30));
>
> ws.sum(2).printToErr();
> env.execute();
>
> The error I get is:
> Caused by: java.io.IOException: No table result scanner provided!
> at
> org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:103)
>
> It seems the "Result" is not read for a first time before calling this
> function.
>
> I built a "StreamingTableInputFormat" as a temporary work around but let me
> know if there is something I did wrong.
>
> Thanks for everything, Flink is great!
>
> Cheers,
> Christophe