Re: Flink Data Stream Union

Posted by flinkuser on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-Data-Stream-Union-tp3169p3215.html

I'm running this on IDE. 
I'm still using 0.9.1, will try out .10-SNAPSHOT and let you know.

Thanks


On Wed, Oct 21, 2015 at 12:36 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
I think the most crucial question is still whether you are running 0.9.1 or 0.10-SNAPSHOT, because the 0.9.1 union has known issues...
If you are running 0.9.1 there is not much you can do except upgrade the version ;-)

On Wed, Oct 21, 2015 at 5:19 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
first of all, am I correct to assume that
new SocketSource(hostName1, port, '\n', -1)
should be
new SocketTextStreamFunction(hostName1, port1, '\n', -1)

or are you using a custom built SocketSource for this?

If I replace it by SocketTextStreamFunction and execute it the example runs and prints incoming Strings from both input sockets.

How are you executing the example? In the IDE or on a Flink cluster?

Cheers,
Aljoscha
> On 21 Oct 2015, at 15:02, flinkuser <[hidden email]> wrote:
>
> Here is the strange behavior.
>
> Below code works in one box but not in the other. I had it working in my
> laptop the whole of yesterday, but strangely today it doesnt work in my
> desktop.
>
> Can anyone please let me know what the issue is.
>
>
> public static void main(String[] args) throws Exception {
>               try {
>                       final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>                       DataStream<String> msgDataStream1 = env.addSource((new
> SocketSource(hostName1, port, '\n', -1))).filter(new
> MessageFilter()).setParallelism(1);
>                       DataStream<String> msgDataStream2 = env.addSource((new
> SocketSource(hostName2, port, '\n', -1))).filter(new
> MessageFilter()).setParallelism(1);
>
>                       env.execute("Stock stream");
>
>               } catch (Exception e) {
>                       System.err.println("Exception  = > " + e.getMessage());
>                       e.printStackTrace();
>               }
>       }
>
>       private static void unionMessageStreams(DataStream<String> msgDataStream1,
> DataStream<String> msgDataStream2) {
>               try {
>
>                       DataStream<String> ds = msgDataStream1.union(msgDataStream2);
>                       ds.print();
>               } catch (Exception e) {
>                       System.err.println("Exception in union Message Streams () = > " +
> e.getMessage());
>               }
>       }
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.





If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3210.html
To unsubscribe from Flink Data Stream Union, click here.
NAML



--
Gayu