Flink Stream: collect in an array all records within a window

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Stream: collect in an array all records within a window

Saiph Kappa
Hi,

After performing a windowAll() on a DataStream[String], is there any method to collect and return an array with all Strings within a window (similar to .collect in Spark).


I basically want to ship all strings in a window to a remote server through a socket, and want to use the same socket connection for all strings that I send. The method .addSink iterates over all records, but does the provided function runs on the flink client or on the server?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: collect in an array all records within a window

Matthias J. Sax-2
Hi Saiph,

you can use AllWindowFunction via .apply(...) to get an .collect method:

From:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

> // applying an AllWindowFunction on non-keyed window stream
> allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
>     public void apply (Window window,
>             Iterable<Tuple2<String, Integer>> values,
>             Collector<Integer> out) throws Exception {
>         int sum = 0;
>         for (value t: values) {
>             sum += t.f1;
>         }
>         out.collect (new Integer(sum));
>     }
> });
If you consume all those value via an sink, the sink will run an the
cluster. You can use .writeToSocket(...) as sink:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks

-Matthias


On 01/18/2016 06:30 PM, Saiph Kappa wrote:

> Hi,
>
> After performing a windowAll() on a DataStream[String], is there any
> method to collect and return an array with all Strings within a window
> (similar to .collect in Spark).
>
> I basically want to ship all strings in a window to a remote server
> through a socket, and want to use the same socket connection for all
> strings that I send. The method .addSink iterates over all records, but
> does the provided function runs on the flink client or on the server?
>
> Thanks.


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: collect in an array all records within a window

Saiph Kappa
Hi Matthias,

Thanks for your response. The method .writeToSocket seems to be what I was looking for. Can you tell me what kind of serialization schema should I use assuming my socket server receives strings. I have something like this in scala:
val server = new ServerSocket(9999)
while (true) {
    val s = server.accept()
    val in = new BufferedSource(s.getInputStream()).getLines()

    println(in.next())    
    s.close()
}

Thanks
 

On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email]> wrote:
Hi Saiph,

you can use AllWindowFunction via .apply(...) to get an .collect method:

From:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

> // applying an AllWindowFunction on non-keyed window stream
> allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
>     public void apply (Window window,
>             Iterable<Tuple2<String, Integer>> values,
>             Collector<Integer> out) throws Exception {
>         int sum = 0;
>         for (value t: values) {
>             sum += t.f1;
>         }
>         out.collect (new Integer(sum));
>     }
> });

If you consume all those value via an sink, the sink will run an the
cluster. You can use .writeToSocket(...) as sink:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks

-Matthias


On 01/18/2016 06:30 PM, Saiph Kappa wrote:
> Hi,
>
> After performing a windowAll() on a DataStream[String], is there any
> method to collect and return an array with all Strings within a window
> (similar to .collect in Spark).
>
> I basically want to ship all strings in a window to a remote server
> through a socket, and want to use the same socket connection for all
> strings that I send. The method .addSink iterates over all records, but
> does the provided function runs on the flink client or on the server?
>
> Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: collect in an array all records within a window

Matthias J. Sax-2
There is SimpleStringSchema.

-Matthias

On 01/18/2016 11:21 PM, Saiph Kappa wrote:

> Hi Matthias,
>
> Thanks for your response. The method .writeToSocket seems to be what I
> was looking for. Can you tell me what kind of serialization schema
> should I use assuming my socket server receives strings. I have
> something like this in scala:
>
> |val server =newServerSocket(9999)while(true){val s =server.accept()val
> in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
>
> |
>
> Thanks|
> |
>
>  
>
> On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Saiph,
>
>     you can use AllWindowFunction via .apply(...) to get an .collect method:
>
>     From:
>     https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>
>     > // applying an AllWindowFunction on non-keyed window stream
>     > allWindowedStream.apply (new
>     AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
>     >     public void apply (Window window,
>     >             Iterable<Tuple2<String, Integer>> values,
>     >             Collector<Integer> out) throws Exception {
>     >         int sum = 0;
>     >         for (value t: values) {
>     >             sum += t.f1;
>     >         }
>     >         out.collect (new Integer(sum));
>     >     }
>     > });
>
>     If you consume all those value via an sink, the sink will run an the
>     cluster. You can use .writeToSocket(...) as sink:
>     https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
>
>     -Matthias
>
>
>     On 01/18/2016 06:30 PM, Saiph Kappa wrote:
>     > Hi,
>     >
>     > After performing a windowAll() on a DataStream[String], is there any
>     > method to collect and return an array with all Strings within a window
>     > (similar to .collect in Spark).
>     >
>     > I basically want to ship all strings in a window to a remote server
>     > through a socket, and want to use the same socket connection for all
>     > strings that I send. The method .addSink iterates over all
>     records, but
>     > does the provided function runs on the flink client or on the server?
>     >
>     > Thanks.
>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: collect in an array all records within a window

Saiph Kappa
When I use SimpleStringSchema I get the error: Type mismatch, expected: SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I think SimpleStringSchema extends SerializationSchema[String], and therefore cannot be used as argument of writeToSocket. Can you confirm this please?

s.writeToSocket(host, port.toInt, new SimpleStringSchema())

Thanks.

On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <[hidden email]> wrote:
There is SimpleStringSchema.

-Matthias

On 01/18/2016 11:21 PM, Saiph Kappa wrote:
> Hi Matthias,
>
> Thanks for your response. The method .writeToSocket seems to be what I
> was looking for. Can you tell me what kind of serialization schema
> should I use assuming my socket server receives strings. I have
> something like this in scala:
>
> |val server =newServerSocket(9999)while(true){val s =server.accept()val
> in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
>
> |
>
> Thanks|
> |
>
>
>
> On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Saiph,
>
>     you can use AllWindowFunction via .apply(...) to get an .collect method:
>
>     From:
>     https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>
>     > // applying an AllWindowFunction on non-keyed window stream
>     > allWindowedStream.apply (new
>     AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
>     >     public void apply (Window window,
>     >             Iterable<Tuple2<String, Integer>> values,
>     >             Collector<Integer> out) throws Exception {
>     >         int sum = 0;
>     >         for (value t: values) {
>     >             sum += t.f1;
>     >         }
>     >         out.collect (new Integer(sum));
>     >     }
>     > });
>
>     If you consume all those value via an sink, the sink will run an the
>     cluster. You can use .writeToSocket(...) as sink:
>     https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
>
>     -Matthias
>
>
>     On 01/18/2016 06:30 PM, Saiph Kappa wrote:
>     > Hi,
>     >
>     > After performing a windowAll() on a DataStream[String], is there any
>     > method to collect and return an array with all Strings within a window
>     > (similar to .collect in Spark).
>     >
>     > I basically want to ship all strings in a window to a remote server
>     > through a socket, and want to use the same socket connection for all
>     > strings that I send. The method .addSink iterates over all
>     records, but
>     > does the provided function runs on the flink client or on the server?
>     >
>     > Thanks.
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: collect in an array all records within a window

Matthias J. Sax-2
What type is your DataStream? It must be DataStream[String] to work with
SimpleStringSchema.

If you have a different type, just implement a customized
SerializationSchema.

-Matthias


On 01/19/2016 11:26 AM, Saiph Kappa wrote:

> When I use SimpleStringSchema I get the error: Type mismatch, expected:
> SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I
> think SimpleStringSchema extends SerializationSchema[String], and
> therefore cannot be used as argument of writeToSocket. Can you confirm
> this please?
>
> s.writeToSocket(host, port.toInt, new SimpleStringSchema())
>
>
> Thanks.
>
> On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     There is SimpleStringSchema.
>
>     -Matthias
>
>     On 01/18/2016 11:21 PM, Saiph Kappa wrote:
>     > Hi Matthias,
>     >
>     > Thanks for your response. The method .writeToSocket seems to be what I
>     > was looking for. Can you tell me what kind of serialization schema
>     > should I use assuming my socket server receives strings. I have
>     > something like this in scala:
>     >
>     > |val server =newServerSocket(9999)while(true){val s =server.accept()val
>     > in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
>     >
>     > |
>     >
>     > Thanks|
>     > |
>     >
>     >
>     >
>     > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email] <mailto:[hidden email]>
>     > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>     >
>     >     Hi Saiph,
>     >
>     >     you can use AllWindowFunction via .apply(...) to get an
>     .collect method:
>     >
>     >     From:
>     >  
>      https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>     >
>     >     > // applying an AllWindowFunction on non-keyed window stream
>     >     > allWindowedStream.apply (new
>     >     AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
>     >     >     public void apply (Window window,
>     >     >             Iterable<Tuple2<String, Integer>> values,
>     >     >             Collector<Integer> out) throws Exception {
>     >     >         int sum = 0;
>     >     >         for (value t: values) {
>     >     >             sum += t.f1;
>     >     >         }
>     >     >         out.collect (new Integer(sum));
>     >     >     }
>     >     > });
>     >
>     >     If you consume all those value via an sink, the sink will run
>     an the
>     >     cluster. You can use .writeToSocket(...) as sink:
>     >  
>      https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
>     >
>     >     -Matthias
>     >
>     >
>     >     On 01/18/2016 06:30 PM, Saiph Kappa wrote:
>     >     > Hi,
>     >     >
>     >     > After performing a windowAll() on a DataStream[String], is
>     there any
>     >     > method to collect and return an array with all Strings
>     within a window
>     >     > (similar to .collect in Spark).
>     >     >
>     >     > I basically want to ship all strings in a window to a remote
>     server
>     >     > through a socket, and want to use the same socket connection
>     for all
>     >     > strings that I send. The method .addSink iterates over all
>     >     records, but
>     >     > does the provided function runs on the flink client or on
>     the server?
>     >     >
>     >     > Thanks.
>     >
>     >
>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: collect in an array all records within a window

Saiph Kappa
It's DataStream[String]. So it seems that SimpleStringSchema cannot be used in writeToSocket regardless of the type of the DataStream. Right?

On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax <[hidden email]> wrote:
What type is your DataStream? It must be DataStream[String] to work with
SimpleStringSchema.

If you have a different type, just implement a customized
SerializationSchema.

-Matthias


On 01/19/2016 11:26 AM, Saiph Kappa wrote:
> When I use SimpleStringSchema I get the error: Type mismatch, expected:
> SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I
> think SimpleStringSchema extends SerializationSchema[String], and
> therefore cannot be used as argument of writeToSocket. Can you confirm
> this please?
>
> s.writeToSocket(host, port.toInt, new SimpleStringSchema())
>
>
> Thanks.
>
> On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     There is SimpleStringSchema.
>
>     -Matthias
>
>     On 01/18/2016 11:21 PM, Saiph Kappa wrote:
>     > Hi Matthias,
>     >
>     > Thanks for your response. The method .writeToSocket seems to be what I
>     > was looking for. Can you tell me what kind of serialization schema
>     > should I use assuming my socket server receives strings. I have
>     > something like this in scala:
>     >
>     > |val server =newServerSocket(9999)while(true){val s =server.accept()val
>     > in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
>     >
>     > |
>     >
>     > Thanks|
>     > |
>     >
>     >
>     >
>     > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email] <mailto:[hidden email]>
>     > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>     >
>     >     Hi Saiph,
>     >
>     >     you can use AllWindowFunction via .apply(...) to get an
>     .collect method:
>     >
>     >     From:
>     >
>      https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>     >
>     >     > // applying an AllWindowFunction on non-keyed window stream
>     >     > allWindowedStream.apply (new
>     >     AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
>     >     >     public void apply (Window window,
>     >     >             Iterable<Tuple2<String, Integer>> values,
>     >     >             Collector<Integer> out) throws Exception {
>     >     >         int sum = 0;
>     >     >         for (value t: values) {
>     >     >             sum += t.f1;
>     >     >         }
>     >     >         out.collect (new Integer(sum));
>     >     >     }
>     >     > });
>     >
>     >     If you consume all those value via an sink, the sink will run
>     an the
>     >     cluster. You can use .writeToSocket(...) as sink:
>     >
>      https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
>     >
>     >     -Matthias
>     >
>     >
>     >     On 01/18/2016 06:30 PM, Saiph Kappa wrote:
>     >     > Hi,
>     >     >
>     >     > After performing a windowAll() on a DataStream[String], is
>     there any
>     >     > method to collect and return an array with all Strings
>     within a window
>     >     > (similar to .collect in Spark).
>     >     >
>     >     > I basically want to ship all strings in a window to a remote
>     server
>     >     > through a socket, and want to use the same socket connection
>     for all
>     >     > strings that I send. The method .addSink iterates over all
>     >     records, but
>     >     > does the provided function runs on the flink client or on
>     the server?
>     >     >
>     >     > Thanks.
>     >
>     >
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: collect in an array all records within a window

Matthias J. Sax-2
It should work.

Your error message indicates, that your DataStream is of type
[String,Array[Byte]] and not of type [String].

> Type mismatch, expected: SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema

Can you maybe share your code?

-Matthias

On 01/19/2016 01:57 PM, Saiph Kappa wrote:

> It's DataStream[String]. So it seems that SimpleStringSchema cannot be
> used in writeToSocket regardless of the type of the DataStream. Right?
>
> On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     What type is your DataStream? It must be DataStream[String] to work with
>     SimpleStringSchema.
>
>     If you have a different type, just implement a customized
>     SerializationSchema.
>
>     -Matthias
>
>
>     On 01/19/2016 11:26 AM, Saiph Kappa wrote:
>     > When I use SimpleStringSchema I get the error: Type mismatch, expected:
>     > SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I
>     > think SimpleStringSchema extends SerializationSchema[String], and
>     > therefore cannot be used as argument of writeToSocket. Can you confirm
>     > this please?
>     >
>     > s.writeToSocket(host, port.toInt, new SimpleStringSchema())
>     >
>     >
>     > Thanks.
>     >
>     > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <[hidden email] <mailto:[hidden email]>
>     > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>     >
>     >     There is SimpleStringSchema.
>     >
>     >     -Matthias
>     >
>     >     On 01/18/2016 11:21 PM, Saiph Kappa wrote:
>     >     > Hi Matthias,
>     >     >
>     >     > Thanks for your response. The method .writeToSocket seems to be what I
>     >     > was looking for. Can you tell me what kind of serialization schema
>     >     > should I use assuming my socket server receives strings. I have
>     >     > something like this in scala:
>     >     >
>     >     > |val server =newServerSocket(9999)while(true){val s =server.accept()val
>     >     > in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
>     >     >
>     >     > |
>     >     >
>     >     > Thanks|
>     >     > |
>     >     >
>     >     >
>     >     >
>     >     > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email] <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     > <mailto:[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>>> wrote:
>     >     >
>     >     >     Hi Saiph,
>     >     >
>     >     >     you can use AllWindowFunction via .apply(...) to get an
>     >     .collect method:
>     >     >
>     >     >     From:
>     >     >
>     >    
>     https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>     >     >
>     >     >     > // applying an AllWindowFunction on non-keyed window
>     stream
>     >     >     > allWindowedStream.apply (new
>     >     >     AllWindowFunction<Tuple2<String,Integer>, Integer,
>     Window>() {
>     >     >     >     public void apply (Window window,
>     >     >     >             Iterable<Tuple2<String, Integer>> values,
>     >     >     >             Collector<Integer> out) throws Exception {
>     >     >     >         int sum = 0;
>     >     >     >         for (value t: values) {
>     >     >     >             sum += t.f1;
>     >     >     >         }
>     >     >     >         out.collect (new Integer(sum));
>     >     >     >     }
>     >     >     > });
>     >     >
>     >     >     If you consume all those value via an sink, the sink
>     will run
>     >     an the
>     >     >     cluster. You can use .writeToSocket(...) as sink:
>     >     >
>     >    
>     https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
>     >     >
>     >     >     -Matthias
>     >     >
>     >     >
>     >     >     On 01/18/2016 06:30 PM, Saiph Kappa wrote:
>     >     >     > Hi,
>     >     >     >
>     >     >     > After performing a windowAll() on a DataStream[String], is
>     >     there any
>     >     >     > method to collect and return an array with all Strings
>     >     within a window
>     >     >     > (similar to .collect in Spark).
>     >     >     >
>     >     >     > I basically want to ship all strings in a window to a
>     remote
>     >     server
>     >     >     > through a socket, and want to use the same socket
>     connection
>     >     for all
>     >     >     > strings that I send. The method .addSink iterates over all
>     >     >     records, but
>     >     >     > does the provided function runs on the flink client or on
>     >     the server?
>     >     >     >
>     >     >     > Thanks.
>     >     >
>     >     >
>     >
>     >
>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: collect in an array all records within a window

Saiph Kappa
I think this is a bug in the scala API.

def writeToSocket(hostname : scala.Predef.String, port : java.lang.Integer, schema : org.apache.flink.streaming.util.serialization.SerializationSchema[T, scala.Array[scala.Byte]]) : org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /* compiled code */ }


On Tue, Jan 19, 2016 at 2:16 PM, Matthias J. Sax <[hidden email]> wrote:
It should work.

Your error message indicates, that your DataStream is of type
[String,Array[Byte]] and not of type [String].

> Type mismatch, expected: SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema

Can you maybe share your code?

-Matthias

On 01/19/2016 01:57 PM, Saiph Kappa wrote:
> It's DataStream[String]. So it seems that SimpleStringSchema cannot be
> used in writeToSocket regardless of the type of the DataStream. Right?
>
> On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     What type is your DataStream? It must be DataStream[String] to work with
>     SimpleStringSchema.
>
>     If you have a different type, just implement a customized
>     SerializationSchema.
>
>     -Matthias
>
>
>     On 01/19/2016 11:26 AM, Saiph Kappa wrote:
>     > When I use SimpleStringSchema I get the error: Type mismatch, expected:
>     > SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I
>     > think SimpleStringSchema extends SerializationSchema[String], and
>     > therefore cannot be used as argument of writeToSocket. Can you confirm
>     > this please?
>     >
>     > s.writeToSocket(host, port.toInt, new SimpleStringSchema())
>     >
>     >
>     > Thanks.
>     >
>     > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <[hidden email] <mailto:[hidden email]>
>     > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>     >
>     >     There is SimpleStringSchema.
>     >
>     >     -Matthias
>     >
>     >     On 01/18/2016 11:21 PM, Saiph Kappa wrote:
>     >     > Hi Matthias,
>     >     >
>     >     > Thanks for your response. The method .writeToSocket seems to be what I
>     >     > was looking for. Can you tell me what kind of serialization schema
>     >     > should I use assuming my socket server receives strings. I have
>     >     > something like this in scala:
>     >     >
>     >     > |val server =newServerSocket(9999)while(true){val s =server.accept()val
>     >     > in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
>     >     >
>     >     > |
>     >     >
>     >     > Thanks|
>     >     > |
>     >     >
>     >     >
>     >     >
>     >     > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <[hidden email] <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     > <mailto:[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>>> wrote:
>     >     >
>     >     >     Hi Saiph,
>     >     >
>     >     >     you can use AllWindowFunction via .apply(...) to get an
>     >     .collect method:
>     >     >
>     >     >     From:
>     >     >
>     >
>     https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>     >     >
>     >     >     > // applying an AllWindowFunction on non-keyed window
>     stream
>     >     >     > allWindowedStream.apply (new
>     >     >     AllWindowFunction<Tuple2<String,Integer>, Integer,
>     Window>() {
>     >     >     >     public void apply (Window window,
>     >     >     >             Iterable<Tuple2<String, Integer>> values,
>     >     >     >             Collector<Integer> out) throws Exception {
>     >     >     >         int sum = 0;
>     >     >     >         for (value t: values) {
>     >     >     >             sum += t.f1;
>     >     >     >         }
>     >     >     >         out.collect (new Integer(sum));
>     >     >     >     }
>     >     >     > });
>     >     >
>     >     >     If you consume all those value via an sink, the sink
>     will run
>     >     an the
>     >     >     cluster. You can use .writeToSocket(...) as sink:
>     >     >
>     >
>     https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
>     >     >
>     >     >     -Matthias
>     >     >
>     >     >
>     >     >     On 01/18/2016 06:30 PM, Saiph Kappa wrote:
>     >     >     > Hi,
>     >     >     >
>     >     >     > After performing a windowAll() on a DataStream[String], is
>     >     there any
>     >     >     > method to collect and return an array with all Strings
>     >     within a window
>     >     >     > (similar to .collect in Spark).
>     >     >     >
>     >     >     > I basically want to ship all strings in a window to a
>     remote
>     >     server
>     >     >     > through a socket, and want to use the same socket
>     connection
>     >     for all
>     >     >     > strings that I send. The method .addSink iterates over all
>     >     >     records, but
>     >     >     > does the provided function runs on the flink client or on
>     >     the server?
>     >     >     >
>     >     >     > Thanks.
>     >     >
>     >     >
>     >
>     >
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream: collect in an array all records within a window

Matthias J. Sax-2
Seems you are right. It works on the current 1.0-Snapshot version which
has a different signature...

> def writeToSocket(
>       hostname: String,
>       port: Integer,
>       schema: SerializationSchema[T]): DataStreamSink[T] = {
>     javaStream.writeToSocket(hostname, port, schema)
>   }

instead of 0.10.1:

> def writeToSocket(
>       hostname: String,
>       port: Integer,
>       schema: SerializationSchema[T, Array[Byte]]): DataStreamSink[T] = {
>     javaStream.writeToSocket(hostname, port, schema)
>   }

I guess, you can still implement your own SerializationSchema for 0.10.1
to make it work.


-Matthias


On 01/19/2016 04:27 PM, Saiph Kappa wrote:

> I think this is a bug in the scala API.
>
> def writeToSocket(hostname : scala.Predef.String, port : java.lang.Integer, schema : org.apache.flink.streaming.util.serialization.SerializationSchema[T, scala.Array[scala.Byte]]) : org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /* compiled code */ }
>
>
>
> On Tue, Jan 19, 2016 at 2:16 PM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     It should work.
>
>     Your error message indicates, that your DataStream is of type
>     [String,Array[Byte]] and not of type [String].
>
>     > Type mismatch, expected: SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema
>
>     Can you maybe share your code?
>
>     -Matthias
>
>     On 01/19/2016 01:57 PM, Saiph Kappa wrote:
>     > It's DataStream[String]. So it seems that SimpleStringSchema cannot be
>     > used in writeToSocket regardless of the type of the DataStream. Right?
>     >
>     > On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax <[hidden email] <mailto:[hidden email]>
>     > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>     >
>     >     What type is your DataStream? It must be DataStream[String] to work with
>     >     SimpleStringSchema.
>     >
>     >     If you have a different type, just implement a customized
>     >     SerializationSchema.
>     >
>     >     -Matthias
>     >
>     >
>     >     On 01/19/2016 11:26 AM, Saiph Kappa wrote:
>     >     > When I use SimpleStringSchema I get the error: Type mismatch, expected:
>     >     > SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I
>     >     > think SimpleStringSchema extends SerializationSchema[String], and
>     >     > therefore cannot be used as argument of writeToSocket. Can you confirm
>     >     > this please?
>     >     >
>     >     > s.writeToSocket(host, port.toInt, new SimpleStringSchema())
>     >     >
>     >     >
>     >     > Thanks.
>     >     >
>     >     > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <[hidden email] <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     > <mailto:[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>>> wrote:
>     >     >
>     >     >     There is SimpleStringSchema.
>     >     >
>     >     >     -Matthias
>     >     >
>     >     >     On 01/18/2016 11:21 PM, Saiph Kappa wrote:
>     >     >     > Hi Matthias,
>     >     >     >
>     >     >     > Thanks for your response. The method .writeToSocket
>     seems to be what I
>     >     >     > was looking for. Can you tell me what kind of
>     serialization schema
>     >     >     > should I use assuming my socket server receives
>     strings. I have
>     >     >     > something like this in scala:
>     >     >     >
>     >     >     > |val server =newServerSocket(9999)while(true){val s
>     =server.accept()val
>     >     >     >
>     in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
>     >     >     >
>     >     >     > |
>     >     >     >
>     >     >     > Thanks|
>     >     >     > |
>     >     >     >
>     >     >     >
>     >     >     >
>     >     >     > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax
>     <[hidden email] <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>> <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email] <mailto:[hidden email]>>>
>     >     >     > <mailto:[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>
>     >     <mailto:[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>>>> wrote:
>     >     >     >
>     >     >     >     Hi Saiph,
>     >     >     >
>     >     >     >     you can use AllWindowFunction via .apply(...) to
>     get an
>     >     >     .collect method:
>     >     >     >
>     >     >     >     From:
>     >     >     >
>     >     >
>     >  
>      https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>     >     >     >
>     >     >     >     > // applying an AllWindowFunction on non-keyed window
>     >     stream
>     >     >     >     > allWindowedStream.apply (new
>     >     >     >     AllWindowFunction<Tuple2<String,Integer>, Integer,
>     >     Window>() {
>     >     >     >     >     public void apply (Window window,
>     >     >     >     >             Iterable<Tuple2<String, Integer>>
>     values,
>     >     >     >     >             Collector<Integer> out) throws
>     Exception {
>     >     >     >     >         int sum = 0;
>     >     >     >     >         for (value t: values) {
>     >     >     >     >             sum += t.f1;
>     >     >     >     >         }
>     >     >     >     >         out.collect (new Integer(sum));
>     >     >     >     >     }
>     >     >     >     > });
>     >     >     >
>     >     >     >     If you consume all those value via an sink, the sink
>     >     will run
>     >     >     an the
>     >     >     >     cluster. You can use .writeToSocket(...) as sink:
>     >     >     >
>     >     >
>     >  
>      https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
>     >     >     >
>     >     >     >     -Matthias
>     >     >     >
>     >     >     >
>     >     >     >     On 01/18/2016 06:30 PM, Saiph Kappa wrote:
>     >     >     >     > Hi,
>     >     >     >     >
>     >     >     >     > After performing a windowAll() on a
>     DataStream[String], is
>     >     >     there any
>     >     >     >     > method to collect and return an array with all
>     Strings
>     >     >     within a window
>     >     >     >     > (similar to .collect in Spark).
>     >     >     >     >
>     >     >     >     > I basically want to ship all strings in a window
>     to a
>     >     remote
>     >     >     server
>     >     >     >     > through a socket, and want to use the same socket
>     >     connection
>     >     >     for all
>     >     >     >     > strings that I send. The method .addSink
>     iterates over all
>     >     >     >     records, but
>     >     >     >     > does the provided function runs on the flink
>     client or on
>     >     >     the server?
>     >     >     >     >
>     >     >     >     > Thanks.
>     >     >     >
>     >     >     >
>     >     >
>     >     >
>     >
>     >
>
>


signature.asc (836 bytes) Download Attachment