Dear Emmanuel, I'm Marton, one of the Flink Streaming developers - Robert forwarded your issue to me. Thanks for trying out our project. 1) Debugging: TaskManager logs are currently not forwarded to the UI, but you can find them on the taskmanager machines in the log folder of your Flink distribution. We have this issue on our agenda in the very near future - they need to be accessible from the UI. 2) Output to socket: Currently we do not have a preimplemented sink for sockets (although we offer a socket source and sinks writing to Apache Kafka, Flume and RabbitMQ). You can easily implement a socket sink by extending the abstract RichSinkFunction class though. [1] For using that you can simply say dataStream.addSink(MySinkFunction()) - in that you can bring up a socket or any other service. You would create a socket in the open function and then in the invoke method you would write every value out to it. I do agree that this is a nice tool to have so I have opened a JIRA ticket for it. [2] 3) Internal data format: Robert was kind enough to offer a more detailed answer on this issue. In general streaming sinks support any file output that is supported by batch Flink including Avro. You can use this functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)). Best, Marton
|
So regarding your last question, the internal data format: Flink is internally (for sending data from one operator to another) using its own data serialization framework, which is very efficient for Tuples/Case classes etc. So you as a user do not need to care about that much. If other components of your infrastructure (application servers for example) rely on generated classes (for example created by protobuf or Avro), you can use these classes with Flink as well. If you are planning to write some of the data you're streaming to a persistent storage (for example HDFS or a local file system), then I would recommend a format like Avro. Regarding the incoming data: JSON is indeed a bit space inefficient when represented as a string. If you are able to change the format of the incoming data, you can also write your own data source (implementing the SourceFunction interface) On Wed, Mar 11, 2015 at 4:37 PM, Márton Balassi <[hidden email]> wrote:
|
In reply to this post by Márton Balassi-2
Hi Marton,
Thanks for the info. I've been trying to implement a socket sink but running into 'Not Serializable' kind of issues. I was seeing in the Spark docs that this is typically an issue, where the socket should be created on the worker node, as it can't be serialized to be moved from the supervisor. So, not sure how this would be implemented in Flink... My attempt (maybe very naive) looked like this: public static final class SocketSink extends RichSinkFunction<String> { maybe i should just move to Kafka directly... ;/ Thanks for help Emmanuel From: [hidden email] Date: Wed, 11 Mar 2015 16:37:41 +0100 Subject: Fwd: Flink questions To: [hidden email] CC: [hidden email]; [hidden email]; [hidden email] Dear Emmanuel, I'm Marton, one of the Flink Streaming developers - Robert forwarded your issue to me. Thanks for trying out our project. 1) Debugging: TaskManager logs are currently not forwarded to the UI, but you can find them on the taskmanager machines in the log folder of your Flink distribution. We have this issue on our agenda in the very near future - they need to be accessible from the UI. 2) Output to socket: Currently we do not have a preimplemented sink for sockets (although we offer a socket source and sinks writing to Apache Kafka, Flume and RabbitMQ). You can easily implement a socket sink by extending the abstract RichSinkFunction class though. [1] For using that you can simply say dataStream.addSink(MySinkFunction()) - in that you can bring up a socket or any other service. You would create a socket in the open function and then in the invoke method you would write every value out to it. I do agree that this is a nice tool to have so I have opened a JIRA ticket for it. [2] 3) Internal data format: Robert was kind enough to offer a more detailed answer on this issue. In general streaming sinks support any file output that is supported by batch Flink including Avro. You can use this functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)). Best, Marton
|
Hi Emmanuel, the open() method should the right place for setting up the socket connection. It is called on the worker node before the first input arrives. Best, Fabian 2015-03-12 1:05 GMT+01:00 Emmanuel <[hidden email]>:
|
I don't see an 'open()' function to override in the RichSinkFunction or the SinkFunction... so where is this open() function supposed to be?
Date: Thu, 12 Mar 2015 01:17:34 +0100 Subject: Re: Socket output stream From: [hidden email] To: [hidden email] Hi Emmanuel, the open() method should the right place for setting up the socket connection. It is called on the worker node before the first input arrives. Best, Fabian 2015-03-12 1:05 GMT+01:00 Emmanuel <[hidden email]>:
|
It is in AbstractRichFunction [1]. RichSinkFunction extends AbstractRichFunction: public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN>Best, Fabian 2015-03-12 1:28 GMT+01:00 Emmanuel <[hidden email]>:
|
Thanks...
This is what I come up with (note I only print every 100,000, or at least this is the intent or i see quite a drop in performance) I hope that can help others too, although there is probably room for improvement. Cheers. package org.myorg.quickstart; Date: Thu, 12 Mar 2015 01:36:09 +0100 Subject: Re: Socket output stream From: [hidden email] To: [hidden email] It is in AbstractRichFunction [1]. RichSinkFunction extends AbstractRichFunction: public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN>Best, Fabian 2015-03-12 1:28 GMT+01:00 Emmanuel <[hidden email]>:
|
Free forum by Nabble | Edit this page |