Hi Flink users,
Can I ask is what would be the better way to read multiple stream sources? I have a FooSource which implements SourceFunction and reads one source, and would like to read several FooSource. FooSource basically reads data as stream by http call. Option1: Use a for-loop to read several data streams and union them. It looks like: List<DataStream<JSONObject>> streams = new ArrayList<>(); Iterator<String> sourceIter = sourceList.iterator(); while (sourceIter.hasNext()){ String source = sourceIter.next(); streams.add(env.addSource(new FooSource<>(source, new JSONSchema(), properties)).rebalance()); } Iterator<DataStream<JSONObject>> streamsIt = streams.iterator(); DataStream<JSONObject> currentStream = streamsIt.next(); while(streamsIt.hasNext()){ DataStream<JSONObject> nextStream = streamsIt.next(); currentStream = currentStream.union(nextStream); } Option2: Implement SourceFunction and reads many FooSource. The implementation in FooSources looks like: @Override public void open(Configuration parameters) throws Exception { fooSourceList = new ArrayList<>(); LOG.info("Opened"); for(String sourceName: sourceNames) { fooSource FooSource = new FooSource(properties, sourceName); fooSource.open(parameters); fooSourceList.add(fooSource); LOG.info("Read source: " + sourceName); } } @Override public void run(final SourceContext<String> ctx) throws Exception { LOG.info("Processing"); // It won't work, however, a parallel for-loop is fine for performance concern? for(FooSource fooSource: fooSourceList) { fooSource.run(ctx); } } Best, Sendoh |
Why don't you define one source and make it parallel? You can implement "RichParallelSourceFunction" and use that to check which parallel subtask the respective source is during execution. On Mon, Jan 23, 2017 at 6:55 PM, Sendoh <[hidden email]> wrote: Hi Flink users, |
This post was updated on .
Hi Stephan,
Thank you for answering my question. I try option 2 with your suggestion(one SourceFunction) and it gives me correct results reading several sources, whereas using ParallelSourceFunction it gives 4 times redundancy (same as my number of threads). Can I ask what would be the reason causing the difference? I think I don't understand SourceFunction and ParallelSourceFunction correctly. Best, Sendoh |
Found the reason.
I saw using ParallelSourceFunction my override open() is called 4 times, comparing to using sourceFunction open() is called only once, and my override open() constructs the connection to sources, which determines how many source are going to be read. Cheers, Sendoh |
Free forum by Nabble | Edit this page |