Better way to read several stream sources

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Better way to read several stream sources

Hung
Hi Flink users,

Can I ask is what would be the better way to read multiple stream sources?

I have a FooSource which implements SourceFunction and reads one source, and would like to read several FooSource. FooSource basically reads data as stream by http call.

Option1:
Use a for-loop to read several data streams and union them.

It looks like:
List<DataStream<JSONObject>> streams = new ArrayList<>();

Iterator<String> sourceIter = sourceList.iterator();

        while (sourceIter.hasNext()){

            String source = sourceIter.next();

            streams.add(env.addSource(new FooSource<>(source,
                    new JSONSchema(), properties)).rebalance());

        }

        Iterator<DataStream<JSONObject>> streamsIt = streams.iterator();

        DataStream<JSONObject> currentStream = streamsIt.next();
        while(streamsIt.hasNext()){
            DataStream<JSONObject> nextStream = streamsIt.next();
            currentStream = currentStream.union(nextStream);
        }

Option2:
Implement SourceFunction and reads many FooSource.

The implementation in FooSources looks like:

@Override
    public void open(Configuration parameters) throws Exception {
        fooSourceList = new ArrayList<>();
        LOG.info("Opened");
        for(String sourceName: sourceNames) {
            fooSource FooSource = new FooSource(properties, sourceName);
            fooSource.open(parameters);
            fooSourceList.add(fooSource);
            LOG.info("Read source: " + sourceName);
        }
    }

    @Override
    public void run(final SourceContext<String> ctx) throws Exception {
        LOG.info("Processing");
        // It won't work, however, a parallel for-loop is fine for performance concern?
        for(FooSource fooSource: fooSourceList) {
            fooSource.run(ctx);
        }
    }

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: Better way to read several stream sources

Stephan Ewen
Why don't you define one source and make it parallel?

You can implement "RichParallelSourceFunction" and use that to check which parallel subtask the respective source is during execution.

On Mon, Jan 23, 2017 at 6:55 PM, Sendoh <[hidden email]> wrote:
Hi Flink users,

Can I ask is what would be the better way to read multiple stream sources?

I have a FooSource which implements SourceFunction and reads one source, and
would like to read several FooSource. FooSource basically reads data as
stream by http call.

Option1:
Use a for-loop to read several data streams and union them.

It looks like:
List<DataStream&lt;JSONObject>> streams = new ArrayList<>();

Iterator<String> sourceIter = sourceList.iterator();

        while (sourceIter.hasNext()){

            String source = sourceIter.next();

            streams.add(env.addSource(new FooSource<>(source,
                    new JSONSchema(), properties)).rebalance());

        }

        Iterator<DataStream&lt;JSONObject>> streamsIt = streams.iterator();

        DataStream<JSONObject> currentStream = streamsIt.next();
        while(streamsIt.hasNext()){
            DataStream<JSONObject> nextStream = streamsIt.next();
            currentStream = currentStream.union(nextStream);
        }

Option2:
Implement SourceFunction and reads many FooSource.

The implementation in FooSources looks like:

@Override
    public void open(Configuration parameters) throws Exception {
        fooSourceList = new ArrayList<>();
        LOG.info("Opened");
        for(String sourceName: sourceNames) {
            fooSource FooSource = new FooSource(properties, sourceName);
            fooSource.open(parameters);
            fooSourceList.add(fooSource);
            LOG.info("Read source: " + sourceName);
        }
    }

    @Override
    public void run(final SourceContext<String> ctx) throws Exception {
        LOG.info("Processing");
        // It won't work, however, a parallel for-loop is fine for
performance concern?
        for(FooSource fooSource: fooSourceList) {
            fooSource.run(ctx);
        }
    }

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-read-several-stream-sources-tp11224.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Better way to read several stream sources

Hung
This post was updated on .
Hi Stephan,

Thank you for answering my question.

I try option 2 with your suggestion(one SourceFunction) and it gives me correct results reading several sources, whereas using ParallelSourceFunction it gives 4 times redundancy (same as my number of threads).

Can I ask what would be the reason causing the difference? I think I don't understand SourceFunction and ParallelSourceFunction correctly.

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: Better way to read several stream sources

Hung
Found the reason.
I saw using ParallelSourceFunction my override open() is called 4 times, comparing to using sourceFunction open() is called only once, and my override open() constructs the connection to sources, which determines how many source are going to be read.

Cheers,

Sendoh