Here is my code snippet but I find the union operator not workable. DataStream<String> msgDataStream1 = env.addSource((new SocketSource(hostName1,port,'\n',-1))).filter(new MessageFilter()).setParallelism(1); DataStream<String> msgDataStream2 = env.addSource((new SocketSource(hostName2,port,'\n',-1))).filter(new MessageFilter()).setParallelism(1); DataStream<String> stockStream = (msgDataStream1.union(msgDataStream2)).iterate(); stockStream.print(); The stockStream doesn’t print the consolidated stream data. Sometimes Stream1 is printed, sometimes none is printed. Can you please help me out, as of what is wrong here. |
Do you really need to iterate ? On Mon, Oct 19, 2015 at 5:42 PM, flinkuser <[hidden email]> wrote:
|
I need to manipulate the enitre set of data received from both the sockets. I even tried below, which doesnt seem to work. Just beginning with Flink, not sure what simple thing i'm missing on. DataStream<String> stockStream =msgDataStream1.union(msgDataStream2); stockStream.print(); On Mon, Oct 19, 2015 at 11:50 AM, Anwar Rizal [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Gayu
|
In reply to this post by Anwar Rizal
Hi! Two comments: (1) The iterate() statement is probably wrong, as noticed by Anwar. (2) Which version of Flink are you using? In 0.9.x, the Union operator is not lock-safe, in 0.10, it should work well. The 0.10 release is coming up shortly, you can try the 0.10-SNAPSHOT version already. Greetings, Stephan On Mon, Oct 19, 2015 at 6:01 PM, Anwar Rizal <[hidden email]> wrote:
|
Thanks much for the comments. I will try to correct the code and check out more. Currently i am making use of 0.9.1 version of Flink. Will get the SNAPSHOT version and check out.. On Tue, Oct 20, 2015 at 12:50 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Gayu
|
In reply to this post by Stephan Ewen
Here is the strange behavior. Below code works in one box but not in the other. I had it working in my laptop the whole of yesterday, but strangely today it doesnt work in my desktop. Can anyone please let me know what the issue is. public static void main(String[] args) throws Exception { try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> msgDataStream1 = env.addSource((new SocketSource(hostName1, port, '\n', -1))).filter(new MessageFilter()).setParallelism(1); DataStream<String> msgDataStream2 = env.addSource((new SocketSource(hostName2, port, '\n', -1))).filter(new MessageFilter()).setParallelism(1); env.execute("Stock stream"); } catch (Exception e) { System.err.println("Exception = > " + e.getMessage()); e.printStackTrace(); } } private static void unionMessageStreams(DataStream<String> msgDataStream1, DataStream<String> msgDataStream2) { try { DataStream<String> ds = msgDataStream1.union(msgDataStream2); ds.print(); } catch (Exception e) { System.err.println("Exception in union Message Streams () = > " + e.getMessage()); } } Thanks On Tue, Oct 20, 2015 at 2:37 PM, Gayu <[hidden email]> wrote:
Gayu
|
Here is the strange behavior.
Below code works in one box but not in the other. I had it working in my laptop the whole of yesterday, but strangely today it doesnt work in my desktop. Can anyone please let me know what the issue is. public static void main(String[] args) throws Exception { try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> msgDataStream1 = env.addSource((new SocketSource(hostName1, port, '\n', -1))).filter(new MessageFilter()).setParallelism(1); DataStream<String> msgDataStream2 = env.addSource((new SocketSource(hostName2, port, '\n', -1))).filter(new MessageFilter()).setParallelism(1); env.execute("Stock stream"); } catch (Exception e) { System.err.println("Exception = > " + e.getMessage()); e.printStackTrace(); } } private static void unionMessageStreams(DataStream<String> msgDataStream1, DataStream<String> msgDataStream2) { try { DataStream<String> ds = msgDataStream1.union(msgDataStream2); ds.print(); } catch (Exception e) { System.err.println("Exception in union Message Streams () = > " + e.getMessage()); } } Thanks |
The issue is i cannot get any data on msgDataStream2.print(), however msgDataStream1.print() produces some data. On Wed, Oct 21, 2015 at 9:02 AM, flinkuser [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Here is the strange behavior. Gayu
|
In reply to this post by flinkuser
Can it be that you forgot to call Cheers, On Wed, Oct 21, 2015 at 3:02 PM, flinkuser <[hidden email]> wrote: Here is the strange behavior. |
Sorry, i skipped to paste that portion of code, but i did call unionMessageStreams in my method.I could see the navigation passing there, but unfortunately no result in one of my desktops, whereas the laptop produces the output correctly. On Wed, Oct 21, 2015 at 9:18 AM, Till Rohrmann [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Gayu
|
In reply to this post by Till Rohrmann
Hi Gayu,
could it be that no data ever arrives on the second input stream? Or that the filter filters out all messages? Also, in the example you posted you forgot to call unionMessageStreams(). Cheers, Aljoscha > On 21 Oct 2015, at 15:29, Till Rohrmann <[hidden email]> wrote: > > Can it be that you forgot to call unionMessageStreams in your main method? > > Cheers, > Till > > > On Wed, Oct 21, 2015 at 3:02 PM, flinkuser <[hidden email]> wrote: > Here is the strange behavior. > > Below code works in one box but not in the other. I had it working in my > laptop the whole of yesterday, but strangely today it doesnt work in my > desktop. > > Can anyone please let me know what the issue is. > > > public static void main(String[] args) throws Exception { > try { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<String> msgDataStream1 = env.addSource((new > SocketSource(hostName1, port, '\n', -1))).filter(new > MessageFilter()).setParallelism(1); > DataStream<String> msgDataStream2 = env.addSource((new > SocketSource(hostName2, port, '\n', -1))).filter(new > MessageFilter()).setParallelism(1); > > env.execute("Stock stream"); > > } catch (Exception e) { > System.err.println("Exception = > " + e.getMessage()); > e.printStackTrace(); > } > } > > private static void unionMessageStreams(DataStream<String> msgDataStream1, > DataStream<String> msgDataStream2) { > try { > > DataStream<String> ds = msgDataStream1.union(msgDataStream2); > ds.print(); > } catch (Exception e) { > System.err.println("Exception in union Message Streams () = > " + > e.getMessage()); > } > } > > Thanks > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
The data does arrive in the second port and i am able to see that in the filter class received. It happens only on specific machine on which i run the code. Yes, i did forget to post here, but my program calls the unionMessageStreams() On Wed, Oct 21, 2015 at 9:39 AM, Aljoscha Krettek <[hidden email]> wrote: Hi Gayu, Gayu
|
So it is received in the filter but the print afterwards does not print?
> On 21 Oct 2015, at 15:49, Gayu <[hidden email]> wrote: > > The data does arrive in the second port and i am able to see that in the filter class received. > It happens only on specific machine on which i run the code. > > > Yes, i did forget to post here, but my program calls the unionMessageStreams() > > On Wed, Oct 21, 2015 at 9:39 AM, Aljoscha Krettek <[hidden email]> wrote: > Hi Gayu, > could it be that no data ever arrives on the second input stream? Or that the filter filters out all messages? > > Also, in the example you posted you forgot to call unionMessageStreams(). > > Cheers, > Aljoscha > > > On 21 Oct 2015, at 15:29, Till Rohrmann <[hidden email]> wrote: > > > > Can it be that you forgot to call unionMessageStreams in your main method? > > > > Cheers, > > Till > > > > > > On Wed, Oct 21, 2015 at 3:02 PM, flinkuser <[hidden email]> wrote: > > Here is the strange behavior. > > > > Below code works in one box but not in the other. I had it working in my > > laptop the whole of yesterday, but strangely today it doesnt work in my > > desktop. > > > > Can anyone please let me know what the issue is. > > > > > > public static void main(String[] args) throws Exception { > > try { > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > DataStream<String> msgDataStream1 = env.addSource((new > > SocketSource(hostName1, port, '\n', -1))).filter(new > > MessageFilter()).setParallelism(1); > > DataStream<String> msgDataStream2 = env.addSource((new > > SocketSource(hostName2, port, '\n', -1))).filter(new > > MessageFilter()).setParallelism(1); > > > > env.execute("Stock stream"); > > > > } catch (Exception e) { > > System.err.println("Exception = > " + e.getMessage()); > > e.printStackTrace(); > > } > > } > > > > private static void unionMessageStreams(DataStream<String> msgDataStream1, > > DataStream<String> msgDataStream2) { > > try { > > > > DataStream<String> ds = msgDataStream1.union(msgDataStream2); > > ds.print(); > > } catch (Exception e) { > > System.err.println("Exception in union Message Streams () = > " + > > e.getMessage()); > > } > > } > > > > Thanks > > > > > > > > -- > > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.html > > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > > > > > > > -- > Gayu |
Yes, exactly. On Wed, Oct 21, 2015 at 10:17 AM, Aljoscha Krettek <[hidden email]> wrote: So it is received in the filter but the print afterwards does not print? Gayu
|
So does the filter maybe filter out everything?
> On 21 Oct 2015, at 16:18, Gayu <[hidden email]> wrote: > > Yes, exactly. > > On Wed, Oct 21, 2015 at 10:17 AM, Aljoscha Krettek <[hidden email]> wrote: > So it is received in the filter but the print afterwards does not print? > > On 21 Oct 2015, at 15:49, Gayu <[hidden email]> wrote: > > > > The data does arrive in the second port and i am able to see that in the filter class received. > > It happens only on specific machine on which i run the code. > > > > > > Yes, i did forget to post here, but my program calls the unionMessageStreams() > > > > On Wed, Oct 21, 2015 at 9:39 AM, Aljoscha Krettek <[hidden email]> wrote: > > Hi Gayu, > > could it be that no data ever arrives on the second input stream? Or that the filter filters out all messages? > > > > Also, in the example you posted you forgot to call unionMessageStreams(). > > > > Cheers, > > Aljoscha > > > > > On 21 Oct 2015, at 15:29, Till Rohrmann <[hidden email]> wrote: > > > > > > Can it be that you forgot to call unionMessageStreams in your main method? > > > > > > Cheers, > > > Till > > > > > > > > > On Wed, Oct 21, 2015 at 3:02 PM, flinkuser <[hidden email]> wrote: > > > Here is the strange behavior. > > > > > > Below code works in one box but not in the other. I had it working in my > > > laptop the whole of yesterday, but strangely today it doesnt work in my > > > desktop. > > > > > > Can anyone please let me know what the issue is. > > > > > > > > > public static void main(String[] args) throws Exception { > > > try { > > > final StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > DataStream<String> msgDataStream1 = env.addSource((new > > > SocketSource(hostName1, port, '\n', -1))).filter(new > > > MessageFilter()).setParallelism(1); > > > DataStream<String> msgDataStream2 = env.addSource((new > > > SocketSource(hostName2, port, '\n', -1))).filter(new > > > MessageFilter()).setParallelism(1); > > > > > > env.execute("Stock stream"); > > > > > > } catch (Exception e) { > > > System.err.println("Exception = > " + e.getMessage()); > > > e.printStackTrace(); > > > } > > > } > > > > > > private static void unionMessageStreams(DataStream<String> msgDataStream1, > > > DataStream<String> msgDataStream2) { > > > try { > > > > > > DataStream<String> ds = msgDataStream1.union(msgDataStream2); > > > ds.print(); > > > } catch (Exception e) { > > > System.err.println("Exception in union Message Streams () = > " + > > > e.getMessage()); > > > } > > > } > > > > > > Thanks > > > > > > > > > > > > -- > > > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.html > > > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > > > > > > > > > > > > > -- > > Gayu > > > > > -- > Gayu |
No, it doesn't i even tried removing the filter and return all the values as is received from the port. My doubt is, is there anything system or CPU specific that fails to attach the data to the datastream created. On Wed, Oct 21, 2015 at 10:39 AM, Aljoscha Krettek <[hidden email]> wrote: So does the filter maybe filter out everything? Gayu
|
Hmm, that is strange.
Could you maybe send the complete program so that I could have a look? > On 21 Oct 2015, at 16:43, Gayu <[hidden email]> wrote: > > No, it doesn't i even tried removing the filter and return all the values as is received from the port. > > My doubt is, is there anything system or CPU specific that fails to attach the data to the datastream created. > > On Wed, Oct 21, 2015 at 10:39 AM, Aljoscha Krettek <[hidden email]> wrote: > So does the filter maybe filter out everything? > > On 21 Oct 2015, at 16:18, Gayu <[hidden email]> wrote: > > > > Yes, exactly. > > > > On Wed, Oct 21, 2015 at 10:17 AM, Aljoscha Krettek <[hidden email]> wrote: > > So it is received in the filter but the print afterwards does not print? > > > On 21 Oct 2015, at 15:49, Gayu <[hidden email]> wrote: > > > > > > The data does arrive in the second port and i am able to see that in the filter class received. > > > It happens only on specific machine on which i run the code. > > > > > > > > > Yes, i did forget to post here, but my program calls the unionMessageStreams() > > > > > > On Wed, Oct 21, 2015 at 9:39 AM, Aljoscha Krettek <[hidden email]> wrote: > > > Hi Gayu, > > > could it be that no data ever arrives on the second input stream? Or that the filter filters out all messages? > > > > > > Also, in the example you posted you forgot to call unionMessageStreams(). > > > > > > Cheers, > > > Aljoscha > > > > > > > On 21 Oct 2015, at 15:29, Till Rohrmann <[hidden email]> wrote: > > > > > > > > Can it be that you forgot to call unionMessageStreams in your main method? > > > > > > > > Cheers, > > > > Till > > > > > > > > > > > > On Wed, Oct 21, 2015 at 3:02 PM, flinkuser <[hidden email]> wrote: > > > > Here is the strange behavior. > > > > > > > > Below code works in one box but not in the other. I had it working in my > > > > laptop the whole of yesterday, but strangely today it doesnt work in my > > > > desktop. > > > > > > > > Can anyone please let me know what the issue is. > > > > > > > > > > > > public static void main(String[] args) throws Exception { > > > > try { > > > > final StreamExecutionEnvironment env = > > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > DataStream<String> msgDataStream1 = env.addSource((new > > > > SocketSource(hostName1, port, '\n', -1))).filter(new > > > > MessageFilter()).setParallelism(1); > > > > DataStream<String> msgDataStream2 = env.addSource((new > > > > SocketSource(hostName2, port, '\n', -1))).filter(new > > > > MessageFilter()).setParallelism(1); > > > > > > > > env.execute("Stock stream"); > > > > > > > > } catch (Exception e) { > > > > System.err.println("Exception = > " + e.getMessage()); > > > > e.printStackTrace(); > > > > } > > > > } > > > > > > > > private static void unionMessageStreams(DataStream<String> msgDataStream1, > > > > DataStream<String> msgDataStream2) { > > > > try { > > > > > > > > DataStream<String> ds = msgDataStream1.union(msgDataStream2); > > > > ds.print(); > > > > } catch (Exception e) { > > > > System.err.println("Exception in union Message Streams () = > " + > > > > e.getMessage()); > > > > } > > > > } > > > > > > > > Thanks > > > > > > > > > > > > > > > > -- > > > > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.html > > > > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > > > > > > > > > > > > > > > > > > > -- > > > Gayu > > > > > > > > > > -- > > Gayu > > > > > -- > Gayu |
In reply to this post by flinkuser
Hi,
first of all, am I correct to assume that new SocketSource(hostName1, port, '\n', -1) should be new SocketTextStreamFunction(hostName1, port1, '\n', -1) or are you using a custom built SocketSource for this? If I replace it by SocketTextStreamFunction and execute it the example runs and prints incoming Strings from both input sockets. How are you executing the example? In the IDE or on a Flink cluster? Cheers, Aljoscha > On 21 Oct 2015, at 15:02, flinkuser <[hidden email]> wrote: > > Here is the strange behavior. > > Below code works in one box but not in the other. I had it working in my > laptop the whole of yesterday, but strangely today it doesnt work in my > desktop. > > Can anyone please let me know what the issue is. > > > public static void main(String[] args) throws Exception { > try { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<String> msgDataStream1 = env.addSource((new > SocketSource(hostName1, port, '\n', -1))).filter(new > MessageFilter()).setParallelism(1); > DataStream<String> msgDataStream2 = env.addSource((new > SocketSource(hostName2, port, '\n', -1))).filter(new > MessageFilter()).setParallelism(1); > > env.execute("Stock stream"); > > } catch (Exception e) { > System.err.println("Exception = > " + e.getMessage()); > e.printStackTrace(); > } > } > > private static void unionMessageStreams(DataStream<String> msgDataStream1, > DataStream<String> msgDataStream2) { > try { > > DataStream<String> ds = msgDataStream1.union(msgDataStream2); > ds.print(); > } catch (Exception e) { > System.err.println("Exception in union Message Streams () = > " + > e.getMessage()); > } > } > > Thanks > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Stream-Union-tp3169p3196.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
I think the most crucial question is still whether you are running 0.9.1 or 0.10-SNAPSHOT, because the 0.9.1 union has known issues... If you are running 0.9.1 there is not much you can do except upgrade the version ;-) On Wed, Oct 21, 2015 at 5:19 PM, Aljoscha Krettek <[hidden email]> wrote: Hi, |
I'm running this on IDE. I'm still using 0.9.1, will try out .10-SNAPSHOT and let you know. Thanks On Wed, Oct 21, 2015 at 12:36 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Gayu
|
Free forum by Nabble | Edit this page |