Collect output of transformations on a custom source in real time

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Collect output of transformations on a custom source in real time

Ahmed Nader
Hello,
I have defined a custom source function for an infinite stream source, where in my overwritten run method I have a while true loop to keep listening for the input. I want to apply some transformations on the resulting datastream from my source and collect the output so far of these transformations in a collection. 
However when i leave my source running in an infinite loop, nothing is really executed. 
Here are some parts of my code to clarify more:

my custom source class:
public class FeedSource implements SourceFunction<Object>

The run method in this class has a while(boolean variable == true) 

Then I call my source and apply filter on it:
datastream = env.addSource(new FeedSource()).filter(); 

then execute:
env.execute();

I want then to collect my datastream in a collection:
Iterator iter = DataStreamUtils.collect(datastream);

So is it possible to first of all apply filter on my stream that way? And then If I'm able to do so, is it possible to keep updating my collection with the content in my datastream so far?

I hope I was able to make my question clear enough.
Thanks,
Ahmed


Reply | Threaded
Open this post in threaded view
|

Re: Collect output of transformations on a custom source in real time

Stephan Ewen
Hi!

I am not sure I understand the problem exactly, but one problem I see in your code is that you call "execute()" on and then "DataStreamUtils.collect(datastream);"

The first call to "env.execute()" will start the program (source and filter) and the results will simply go nowhere.
Then you call "DataStreamUtils.collect(datastream);", which internally calls "execute" again.

In short: remote the first call to "env.execute()", that should do the trick.

Stephan


On Thu, May 26, 2016 at 5:09 PM, Ahmed Nader <[hidden email]> wrote:
Hello,
I have defined a custom source function for an infinite stream source, where in my overwritten run method I have a while true loop to keep listening for the input. I want to apply some transformations on the resulting datastream from my source and collect the output so far of these transformations in a collection. 
However when i leave my source running in an infinite loop, nothing is really executed. 
Here are some parts of my code to clarify more:

my custom source class:
public class FeedSource implements SourceFunction<Object>

The run method in this class has a while(boolean variable == true) 

Then I call my source and apply filter on it:
datastream = env.addSource(new FeedSource()).filter(); 

then execute:
env.execute();

I want then to collect my datastream in a collection:
Iterator iter = DataStreamUtils.collect(datastream);

So is it possible to first of all apply filter on my stream that way? And then If I'm able to do so, is it possible to keep updating my collection with the content in my datastream so far?

I hope I was able to make my question clear enough.
Thanks,
Ahmed