Re: Flink Data Stream Union

Posted by flinkuser on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-Data-Stream-Union-tp3169p3195.html

Here is the strange behavior.

Below code works in one box but not in the other. I had it working in my laptop the whole of yesterday, but strangely today it doesnt work in my desktop.

Can anyone please let me know what the issue is.


public static void main(String[] args) throws Exception {
try {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> msgDataStream1 = env.addSource((new SocketSource(hostName1, port, '\n', -1))).filter(new MessageFilter()).setParallelism(1);
DataStream<String> msgDataStream2 = env.addSource((new SocketSource(hostName2, port, '\n', -1))).filter(new MessageFilter()).setParallelism(1);

env.execute("Stock stream");

} catch (Exception e) {  
System.err.println("Exception  = > " + e.getMessage());
e.printStackTrace();
}
}

private static void unionMessageStreams(DataStream<String> msgDataStream1, DataStream<String> msgDataStream2) {
try {
DataStream<String> ds = msgDataStream1.union(msgDataStream2);
ds.print();
} catch (Exception e) {
System.err.println("Exception in union Message Streams () = > " + e.getMessage());
}
}

Thanks


On Tue, Oct 20, 2015 at 2:37 PM, Gayu <[hidden email]> wrote:
Thanks much for the comments.

I will try to correct the code and check out more. 
Currently i am making use of 0.9.1 version of Flink. Will get the SNAPSHOT version and check out..



On Tue, Oct 20, 2015 at 12:50 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi!

Two comments:

(1) The iterate() statement is probably wrong, as noticed by Anwar.

(2) Which version of Flink are you using? In 0.9.x, the Union operator is not lock-safe, in 0.10, it should work well. The 0.10 release is coming up shortly, you can try the 0.10-SNAPSHOT version already.

Greetings,
Stephan


On Mon, Oct 19, 2015 at 6:01 PM, Anwar Rizal <[hidden email]> wrote:
Do you really need to iterate ?

On Mon, Oct 19, 2015 at 5:42 PM, flinkuser <[hidden email]> wrote:

Here is my code snippet but I find the union operator not workable.

DataStream<String> msgDataStream1 = env.addSource((new
SocketSource(hostName1,port,'\n',-1))).filter(new
MessageFilter()).setParallelism(1);
DataStream<String> msgDataStream2 = env.addSource((new
SocketSource(hostName2,port,'\n',-1))).filter(new
MessageFilter()).setParallelism(1);


DataStream<String> stockStream =
(msgDataStream1.union(msgDataStream2)).iterate();
                                                stockStream.print();


The stockStream doesn’t print the consolidated stream data. Sometimes
Stream1 is printed, sometimes none is printed.

Can you please help me out, as of what is wrong here.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.





If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3184.html
To unsubscribe from Flink Data Stream Union, click here.
NAML



--
Gayu



--
Gayu