Flink writeAsCsv

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink writeAsCsv

Radu Prodan
Hi all,

I am new to flink. I wrote a simple program and I want it to output as csv file. 

timeWindowAll(Time.of(3, TimeUnit.MINUTES))

.apply(newFunction1())

.writeAsCsv("file:///user/someuser/Documents/somefile.csv");


When I change the sink to . print(), it works and outputs some results. 

I want it to output the result of every window. However, it outputs nothing and the file is not created. Am I missing anything?


-best

Radu





Reply | Threaded
Open this post in threaded view
|

Re: Flink writeAsCsv

Márton Balassi
Hey Radu,

As you are using the streaming api I assume that you call env.execute() in both cases. Is that the case?

Do you see any errors appearing? My first call would be if your data type is not a tuple type then writeAsCsv does not work by default.

Best,

Marton

On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <[hidden email]> wrote:
Hi all,

I am new to flink. I wrote a simple program and I want it to output as csv file. 

timeWindowAll(Time.of(3, TimeUnit.MINUTES))

.apply(newFunction1())

.writeAsCsv("file:///user/someuser/Documents/somefile.csv");


When I change the sink to . print(), it works and outputs some results. 

I want it to output the result of every window. However, it outputs nothing and the file is not created. Am I missing anything?


-best

Radu






Reply | Threaded
Open this post in threaded view
|

Re: Flink writeAsCsv

Radu Prodan
Hi Marton,

Thanks to your comment I managed to get it worked. At least it outputs the results. However, what I need is to output each window result seperately.  Now, it outputs the results of parallel working windows (I think) and appends the new results to them. For example, If I have parallelism of 10, then I will have at most 10 files and each file will grow in size as windows continue.
What I want is, to have seperate file for a window. For example, after n'th window is computed output it to some file and close the file.

-best
Radu

On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi <[hidden email]> wrote:
Hey Radu,

As you are using the streaming api I assume that you call env.execute() in both cases. Is that the case?

Do you see any errors appearing? My first call would be if your data type is not a tuple type then writeAsCsv does not work by default.

Best,

Marton

On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <[hidden email]> wrote:
Hi all,

I am new to flink. I wrote a simple program and I want it to output as csv file. 

timeWindowAll(Time.of(3, TimeUnit.MINUTES))

.apply(newFunction1())

.writeAsCsv("file:///user/someuser/Documents/somefile.csv");


When I change the sink to . print(), it works and outputs some results. 

I want it to output the result of every window. However, it outputs nothing and the file is not created. Am I missing anything?


-best

Radu






Reply | Threaded
Open this post in threaded view
|

RE: Flink writeAsCsv

Radu Tudoran

Hi Radu,

 

It is indeed interesting to know how each window could be registered separately -  I am not sure it any of the existing mechanisms in Flink support this.

I think you need to create your own output sink. It is a bit tricky to pass the window sequence number (actually I do  not think such an index is kept – but you can create one by yourself). Maybe an easier option is to manage the writing of the data yourself in the window function or in a custom created evictor. In the window and in the evictor you have access to all data and you can create specific files for each window triggered

 

 

 

From: Radu Prodan [mailto:[hidden email]]
Sent: Thursday, February 04, 2016 11:58 AM
To: [hidden email]
Subject: Re: Flink writeAsCsv

 

Hi Marton,

 

Thanks to your comment I managed to get it worked. At least it outputs the results. However, what I need is to output each window result seperately.  Now, it outputs the results of parallel working windows (I think) and appends the new results to them. For example, If I have parallelism of 10, then I will have at most 10 files and each file will grow in size as windows continue.

What I want is, to have seperate file for a window. For example, after n'th window is computed output it to some file and close the file.

 

-best

Radu

 

On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi <[hidden email]> wrote:

Hey Radu,

 

As you are using the streaming api I assume that you call env.execute() in both cases. Is that the case?

 

Do you see any errors appearing? My first call would be if your data type is not a tuple type then writeAsCsv does not work by default.

 

Best,

 

Marton

 

On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <[hidden email]> wrote:

Hi all,

 

I am new to flink. I wrote a simple program and I want it to output as csv file. 

 

timeWindowAll(Time.of(3, TimeUnit.MINUTES))

.apply(newFunction1())

.writeAsCsv("<a href="file:///\\user\someuser\Documents\somefile.csv">file:///user/someuser/Documents/somefile.csv");

 

When I change the sink to . print(), it works and outputs some results. 

I want it to output the result of every window. However, it outputs nothing and the file is not created. Am I missing anything?

 

-best

Radu

 

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink writeAsCsv

Fabian Hueske-2
You can get the end time of a window from the TimeWindow object which is passed to the AllWindowFunction. This is basically a window ID / index.
I would go for a custom output sink which writes records to files based on their timestamp.
IMO, this would be cleaner & easier than implementing the file output into the window function.



2016-02-04 13:49 GMT+01:00 Radu Tudoran <[hidden email]>:

Hi Radu,

 

It is indeed interesting to know how each window could be registered separately -  I am not sure it any of the existing mechanisms in Flink support this.

I think you need to create your own output sink. It is a bit tricky to pass the window sequence number (actually I do  not think such an index is kept – but you can create one by yourself). Maybe an easier option is to manage the writing of the data yourself in the window function or in a custom created evictor. In the window and in the evictor you have access to all data and you can create specific files for each window triggered

 

 

 

From: Radu Prodan [mailto:[hidden email]]
Sent: Thursday, February 04, 2016 11:58 AM
To: [hidden email]
Subject: Re: Flink writeAsCsv

 

Hi Marton,

 

Thanks to your comment I managed to get it worked. At least it outputs the results. However, what I need is to output each window result seperately.  Now, it outputs the results of parallel working windows (I think) and appends the new results to them. For example, If I have parallelism of 10, then I will have at most 10 files and each file will grow in size as windows continue.

What I want is, to have seperate file for a window. For example, after n'th window is computed output it to some file and close the file.

 

-best

Radu

 

On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi <[hidden email]> wrote:

Hey Radu,

 

As you are using the streaming api I assume that you call env.execute() in both cases. Is that the case?

 

Do you see any errors appearing? My first call would be if your data type is not a tuple type then writeAsCsv does not work by default.

 

Best,

 

Marton

 

On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <[hidden email]> wrote:

Hi all,

 

I am new to flink. I wrote a simple program and I want it to output as csv file. 

 

timeWindowAll(Time.of(3, TimeUnit.MINUTES))

.apply(newFunction1())

.writeAsCsv("file:///user/someuser/Documents/somefile.csv");

 

When I change the sink to . print(), it works and outputs some results. 

I want it to output the result of every window. However, it outputs nothing and the file is not created. Am I missing anything?

 

-best

Radu