Flink add source with Scala

classic Classic list List threaded Threaded
2 messages Options
hai
Reply | Threaded
Open this post in threaded view
|

Flink add source with Scala

hai

Hello: 

   Is there a example or best practise code of flink’s source of Scala language, I found one example on official code’s HBaseWriteStreamExample:

DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
private static final long serialVersionUID = 1L;

private volatile boolean isRunning = true;

@Override
public void run(SourceContext<String> out) throws Exception {
while (isRunning) {
out.collect(String.valueOf(Math.floor(Math.random() * 100)));
}
}

@Override
public void cancel() {
isRunning = false;
}
});

  My question is how could I do this in a scala’s way. Should I need add the same SourceFunction class ? or I can use a functional way with scala’s function programming. 


Many Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: Flink add source with Scala

Chesnay Schepler
There is no separate Scala SourceFunction interface or similar convenience interfaces, so you'll have to work against the Java version.

On 12/04/2019 09:07, hai wrote:

Hello: 

   Is there a example or best practise code of flink’s source of Scala language, I found one example on official code’s HBaseWriteStreamExample:

DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
    private static final long serialVersionUID = 1L;

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<String> out) throws Exception {
        while (isRunning) {
            out.collect(String.valueOf(Math.floor(Math.random() * 100)));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
});

  My question is how could I do this in a scala’s way. Should I need add the same SourceFunction class ? or I can use a functional way with scala’s function programming. 


Many Thanks.