Hi,
After performing a windowAll() on a DataStream[String], is there any method to collect and return an array with all Strings within a window (similar to .collect in Spark). I basically want to ship all strings in a window to a remote server through a socket, and want to use the same socket connection for all strings that I send. The method .addSink iterates over all records, but does the provided function runs on the flink client or on the server? Thanks. |
Hi Saiph,
you can use AllWindowFunction via .apply(...) to get an .collect method: From: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html > // applying an AllWindowFunction on non-keyed window stream > allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { > public void apply (Window window, > Iterable<Tuple2<String, Integer>> values, > Collector<Integer> out) throws Exception { > int sum = 0; > for (value t: values) { > sum += t.f1; > } > out.collect (new Integer(sum)); > } > }); cluster. You can use .writeToSocket(...) as sink: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks -Matthias On 01/18/2016 06:30 PM, Saiph Kappa wrote: > Hi, > > After performing a windowAll() on a DataStream[String], is there any > method to collect and return an array with all Strings within a window > (similar to .collect in Spark). > > I basically want to ship all strings in a window to a remote server > through a socket, and want to use the same socket connection for all > strings that I send. The method .addSink iterates over all records, but > does the provided function runs on the flink client or on the server? > > Thanks. signature.asc (836 bytes) Download Attachment |
Hi Matthias, Thanks for your response. The method .writeToSocket seems to be what I was looking for. Can you tell me what kind of serialization schema should I use assuming my socket server receives strings. I have something like this in scala:
Thanks On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email]> wrote: Hi Saiph, |
There is SimpleStringSchema.
-Matthias On 01/18/2016 11:21 PM, Saiph Kappa wrote: > Hi Matthias, > > Thanks for your response. The method .writeToSocket seems to be what I > was looking for. Can you tell me what kind of serialization schema > should I use assuming my socket server receives strings. I have > something like this in scala: > > |val server =newServerSocket(9999)while(true){val s =server.accept()val > in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()} > > | > > Thanks| > | > > > > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Saiph, > > you can use AllWindowFunction via .apply(...) to get an .collect method: > > From: > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html > > > // applying an AllWindowFunction on non-keyed window stream > > allWindowedStream.apply (new > AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { > > public void apply (Window window, > > Iterable<Tuple2<String, Integer>> values, > > Collector<Integer> out) throws Exception { > > int sum = 0; > > for (value t: values) { > > sum += t.f1; > > } > > out.collect (new Integer(sum)); > > } > > }); > > If you consume all those value via an sink, the sink will run an the > cluster. You can use .writeToSocket(...) as sink: > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks > > -Matthias > > > On 01/18/2016 06:30 PM, Saiph Kappa wrote: > > Hi, > > > > After performing a windowAll() on a DataStream[String], is there any > > method to collect and return an array with all Strings within a window > > (similar to .collect in Spark). > > > > I basically want to ship all strings in a window to a remote server > > through a socket, and want to use the same socket connection for all > > strings that I send. The method .addSink iterates over all > records, but > > does the provided function runs on the flink client or on the server? > > > > Thanks. > > signature.asc (836 bytes) Download Attachment |
When I use SimpleStringSchema I get the error: Type mismatch, expected: SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I think SimpleStringSchema extends SerializationSchema[String], and therefore cannot be used as argument of writeToSocket. Can you confirm this please? Thanks.s.writeToSocket(host, port.toInt, new SimpleStringSchema()) On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <[hidden email]> wrote: There is SimpleStringSchema. |
What type is your DataStream? It must be DataStream[String] to work with
SimpleStringSchema. If you have a different type, just implement a customized SerializationSchema. -Matthias On 01/19/2016 11:26 AM, Saiph Kappa wrote: > When I use SimpleStringSchema I get the error: Type mismatch, expected: > SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I > think SimpleStringSchema extends SerializationSchema[String], and > therefore cannot be used as argument of writeToSocket. Can you confirm > this please? > > s.writeToSocket(host, port.toInt, new SimpleStringSchema()) > > > Thanks. > > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <[hidden email] > <mailto:[hidden email]>> wrote: > > There is SimpleStringSchema. > > -Matthias > > On 01/18/2016 11:21 PM, Saiph Kappa wrote: > > Hi Matthias, > > > > Thanks for your response. The method .writeToSocket seems to be what I > > was looking for. Can you tell me what kind of serialization schema > > should I use assuming my socket server receives strings. I have > > something like this in scala: > > > > |val server =newServerSocket(9999)while(true){val s =server.accept()val > > in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()} > > > > | > > > > Thanks| > > | > > > > > > > > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > Hi Saiph, > > > > you can use AllWindowFunction via .apply(...) to get an > .collect method: > > > > From: > > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html > > > > > // applying an AllWindowFunction on non-keyed window stream > > > allWindowedStream.apply (new > > AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { > > > public void apply (Window window, > > > Iterable<Tuple2<String, Integer>> values, > > > Collector<Integer> out) throws Exception { > > > int sum = 0; > > > for (value t: values) { > > > sum += t.f1; > > > } > > > out.collect (new Integer(sum)); > > > } > > > }); > > > > If you consume all those value via an sink, the sink will run > an the > > cluster. You can use .writeToSocket(...) as sink: > > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks > > > > -Matthias > > > > > > On 01/18/2016 06:30 PM, Saiph Kappa wrote: > > > Hi, > > > > > > After performing a windowAll() on a DataStream[String], is > there any > > > method to collect and return an array with all Strings > within a window > > > (similar to .collect in Spark). > > > > > > I basically want to ship all strings in a window to a remote > server > > > through a socket, and want to use the same socket connection > for all > > > strings that I send. The method .addSink iterates over all > > records, but > > > does the provided function runs on the flink client or on > the server? > > > > > > Thanks. > > > > > > signature.asc (836 bytes) Download Attachment |
It's DataStream[String]. So it seems that SimpleStringSchema cannot be used in writeToSocket regardless of the type of the DataStream. Right? On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax <[hidden email]> wrote: What type is your DataStream? It must be DataStream[String] to work with |
It should work.
Your error message indicates, that your DataStream is of type [String,Array[Byte]] and not of type [String]. > Type mismatch, expected: SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema Can you maybe share your code? -Matthias On 01/19/2016 01:57 PM, Saiph Kappa wrote: > It's DataStream[String]. So it seems that SimpleStringSchema cannot be > used in writeToSocket regardless of the type of the DataStream. Right? > > On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax <[hidden email] > <mailto:[hidden email]>> wrote: > > What type is your DataStream? It must be DataStream[String] to work with > SimpleStringSchema. > > If you have a different type, just implement a customized > SerializationSchema. > > -Matthias > > > On 01/19/2016 11:26 AM, Saiph Kappa wrote: > > When I use SimpleStringSchema I get the error: Type mismatch, expected: > > SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I > > think SimpleStringSchema extends SerializationSchema[String], and > > therefore cannot be used as argument of writeToSocket. Can you confirm > > this please? > > > > s.writeToSocket(host, port.toInt, new SimpleStringSchema()) > > > > > > Thanks. > > > > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > There is SimpleStringSchema. > > > > -Matthias > > > > On 01/18/2016 11:21 PM, Saiph Kappa wrote: > > > Hi Matthias, > > > > > > Thanks for your response. The method .writeToSocket seems to be what I > > > was looking for. Can you tell me what kind of serialization schema > > > should I use assuming my socket server receives strings. I have > > > something like this in scala: > > > > > > |val server =newServerSocket(9999)while(true){val s =server.accept()val > > > in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()} > > > > > > | > > > > > > Thanks| > > > | > > > > > > > > > > > > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email] <mailto:[hidden email]> <mailto:[hidden email] > <mailto:[hidden email]>> > > > <mailto:[hidden email] <mailto:[hidden email]> > <mailto:[hidden email] <mailto:[hidden email]>>>> wrote: > > > > > > Hi Saiph, > > > > > > you can use AllWindowFunction via .apply(...) to get an > > .collect method: > > > > > > From: > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html > > > > > > > // applying an AllWindowFunction on non-keyed window > stream > > > > allWindowedStream.apply (new > > > AllWindowFunction<Tuple2<String,Integer>, Integer, > Window>() { > > > > public void apply (Window window, > > > > Iterable<Tuple2<String, Integer>> values, > > > > Collector<Integer> out) throws Exception { > > > > int sum = 0; > > > > for (value t: values) { > > > > sum += t.f1; > > > > } > > > > out.collect (new Integer(sum)); > > > > } > > > > }); > > > > > > If you consume all those value via an sink, the sink > will run > > an the > > > cluster. You can use .writeToSocket(...) as sink: > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks > > > > > > -Matthias > > > > > > > > > On 01/18/2016 06:30 PM, Saiph Kappa wrote: > > > > Hi, > > > > > > > > After performing a windowAll() on a DataStream[String], is > > there any > > > > method to collect and return an array with all Strings > > within a window > > > > (similar to .collect in Spark). > > > > > > > > I basically want to ship all strings in a window to a > remote > > server > > > > through a socket, and want to use the same socket > connection > > for all > > > > strings that I send. The method .addSink iterates over all > > > records, but > > > > does the provided function runs on the flink client or on > > the server? > > > > > > > > Thanks. > > > > > > > > > > > > signature.asc (836 bytes) Download Attachment |
I think this is a bug in the scala API. def writeToSocket(hostname : scala.Predef.String, port : java.lang.Integer, schema : org.apache.flink.streaming.util.serialization.SerializationSchema[T, scala.Array[scala.Byte]]) : org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /* compiled code */ } On Tue, Jan 19, 2016 at 2:16 PM, Matthias J. Sax <[hidden email]> wrote: It should work. |
Seems you are right. It works on the current 1.0-Snapshot version which
has a different signature... > def writeToSocket( > hostname: String, > port: Integer, > schema: SerializationSchema[T]): DataStreamSink[T] = { > javaStream.writeToSocket(hostname, port, schema) > } instead of 0.10.1: > def writeToSocket( > hostname: String, > port: Integer, > schema: SerializationSchema[T, Array[Byte]]): DataStreamSink[T] = { > javaStream.writeToSocket(hostname, port, schema) > } I guess, you can still implement your own SerializationSchema for 0.10.1 to make it work. -Matthias On 01/19/2016 04:27 PM, Saiph Kappa wrote: > I think this is a bug in the scala API. > > def writeToSocket(hostname : scala.Predef.String, port : java.lang.Integer, schema : org.apache.flink.streaming.util.serialization.SerializationSchema[T, scala.Array[scala.Byte]]) : org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /* compiled code */ } > > > > On Tue, Jan 19, 2016 at 2:16 PM, Matthias J. Sax <[hidden email] > <mailto:[hidden email]>> wrote: > > It should work. > > Your error message indicates, that your DataStream is of type > [String,Array[Byte]] and not of type [String]. > > > Type mismatch, expected: SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema > > Can you maybe share your code? > > -Matthias > > On 01/19/2016 01:57 PM, Saiph Kappa wrote: > > It's DataStream[String]. So it seems that SimpleStringSchema cannot be > > used in writeToSocket regardless of the type of the DataStream. Right? > > > > On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > What type is your DataStream? It must be DataStream[String] to work with > > SimpleStringSchema. > > > > If you have a different type, just implement a customized > > SerializationSchema. > > > > -Matthias > > > > > > On 01/19/2016 11:26 AM, Saiph Kappa wrote: > > > When I use SimpleStringSchema I get the error: Type mismatch, expected: > > > SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I > > > think SimpleStringSchema extends SerializationSchema[String], and > > > therefore cannot be used as argument of writeToSocket. Can you confirm > > > this please? > > > > > > s.writeToSocket(host, port.toInt, new SimpleStringSchema()) > > > > > > > > > Thanks. > > > > > > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <[hidden email] <mailto:[hidden email]> <mailto:[hidden email] > <mailto:[hidden email]>> > > > <mailto:[hidden email] <mailto:[hidden email]> > <mailto:[hidden email] <mailto:[hidden email]>>>> wrote: > > > > > > There is SimpleStringSchema. > > > > > > -Matthias > > > > > > On 01/18/2016 11:21 PM, Saiph Kappa wrote: > > > > Hi Matthias, > > > > > > > > Thanks for your response. The method .writeToSocket > seems to be what I > > > > was looking for. Can you tell me what kind of > serialization schema > > > > should I use assuming my socket server receives > strings. I have > > > > something like this in scala: > > > > > > > > |val server =newServerSocket(9999)while(true){val s > =server.accept()val > > > > > in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()} > > > > > > > > | > > > > > > > > Thanks| > > > > | > > > > > > > > > > > > > > > > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax > <[hidden email] <mailto:[hidden email]> <mailto:[hidden email] > <mailto:[hidden email]>> <mailto:[hidden email] > <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> > > > > <mailto:[hidden email] <mailto:[hidden email]> > <mailto:[hidden email] <mailto:[hidden email]>> > > <mailto:[hidden email] <mailto:[hidden email]> > <mailto:[hidden email] <mailto:[hidden email]>>>>> wrote: > > > > > > > > Hi Saiph, > > > > > > > > you can use AllWindowFunction via .apply(...) to > get an > > > .collect method: > > > > > > > > From: > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html > > > > > > > > > // applying an AllWindowFunction on non-keyed window > > stream > > > > > allWindowedStream.apply (new > > > > AllWindowFunction<Tuple2<String,Integer>, Integer, > > Window>() { > > > > > public void apply (Window window, > > > > > Iterable<Tuple2<String, Integer>> > values, > > > > > Collector<Integer> out) throws > Exception { > > > > > int sum = 0; > > > > > for (value t: values) { > > > > > sum += t.f1; > > > > > } > > > > > out.collect (new Integer(sum)); > > > > > } > > > > > }); > > > > > > > > If you consume all those value via an sink, the sink > > will run > > > an the > > > > cluster. You can use .writeToSocket(...) as sink: > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks > > > > > > > > -Matthias > > > > > > > > > > > > On 01/18/2016 06:30 PM, Saiph Kappa wrote: > > > > > Hi, > > > > > > > > > > After performing a windowAll() on a > DataStream[String], is > > > there any > > > > > method to collect and return an array with all > Strings > > > within a window > > > > > (similar to .collect in Spark). > > > > > > > > > > I basically want to ship all strings in a window > to a > > remote > > > server > > > > > through a socket, and want to use the same socket > > connection > > > for all > > > > > strings that I send. The method .addSink > iterates over all > > > > records, but > > > > > does the provided function runs on the flink > client or on > > > the server? > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > signature.asc (836 bytes) Download Attachment |
Free forum by Nabble | Edit this page |