output writer

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

output writer

Michele Bertoni
Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele
Reply | Threaded
Open this post in threaded view
|

RE: output writer

Radu Tudoran
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele
Reply | Threaded
Open this post in threaded view
|

Re: output writer

Fabian Hueske-2
Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.
Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian

2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele

Reply | Threaded
Open this post in threaded view
|

RE: output writer

Radu Tudoran

I will double check and try to commit this in the next days

 

Dr. Radu Tudoran

Research Engineer

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer

 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian

 

2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:

Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN


-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele

 

Reply | Threaded
Open this post in threaded view
|

RE: output writer

Radu Tudoran
In reply to this post by Fabian Hueske-2

Re-hi,

 

I have double –checked and actually there is an OutputFormat interface in flink which can be extended.

I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.

On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)

 

 

 

Dr. Radu Tudoran

Research Engineer

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer

 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian

 

2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:

Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN


-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele

 

Reply | Threaded
Open this post in threaded view
|

Re: output writer

Michele Bertoni
Hi guys,
sorry for late answer but I am still working to get this done but I don’t understand something

I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that

at first I think I should at least partition my data according to the output key (each key to one file)
then I need to name the file exactly with that key
but I don’t know how to go on

thanks
michele



Il giorno 30/lug/2015, alle ore 12:53, Radu Tudoran <[hidden email]> ha scritto:

Re-hi,
 
I have double –checked and actually there is an OutputFormat interface in flink which can be extended.
I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.
On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)
 
 
 
Dr. Radu Tudoran
Research Engineer
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
 
E-mail: [hidden email]
Mobile: +49 15209084330
Telephone: +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
 
From: Fabian Hueske [[hidden email]] 
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer
 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian
 
2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" style="color: purple; text-decoration: underline;" class="">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" style="color: purple; text-decoration: underline;" class="">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele

Reply | Threaded
Open this post in threaded view
|

Re: output writer

Fabian Hueske-2
Hi Michele,

you need to directly use a FileSystem client (e.g., Hadoop's) to create and write to files. Have a look at the FileOutputFormat [1] which does this for a single file per operator instance / partition. Instead of creating a single file, you need to create one file for each key. However, you want to avoid to have too many files open at a time but also avoid to create too many files containing only a few records. If you use HDFS, this is especially important, because HDFS is bad at handling many small files. Only recent versions of HDFS support appending to files. If you have an older version you have to create a new file for a key if you do not have an open file handle for it.

There are multiple ways to control the number of open files and reduce the number of files:

1) You can partition the data (as you already suggested) to move all records with the same key to the same operator.
2) If you use the batch DataSet API you can sort the data using sortPartition() such that each operator instance has only one file open at a time.
3) Instead of doing a full sort, you could also use combineGroup() to partially sort the data
4) Have a pool of open file handles and an LRU kind of eviction policy to decide which file to close whenever you need open a new one.

Implementing this is not trivial. You can also organize the files per key in folders. Have a look at the InitializeOnMaster and FinalizeOnMaster hooks which are called once before a job is started and after all instance of a task finished.

Let me know, if you need more information or if something is not clear.

Cheers, Fabian

2015-09-08 12:33 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi guys,
sorry for late answer but I am still working to get this done but I don’t understand something

I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that

at first I think I should at least partition my data according to the output key (each key to one file)
then I need to name the file exactly with that key
but I don’t know how to go on

thanks
michele



Il giorno 30/lug/2015, alle ore 12:53, Radu Tudoran <[hidden email]> ha scritto:

Re-hi,
 
I have double –checked and actually there is an OutputFormat interface in flink which can be extended.
I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.
On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)
 
 
 
Dr. Radu Tudoran
Research Engineer
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
 
E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330" target="_blank">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173" target="_blank">+49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
 
From: Fabian Hueske [[hidden email]] 
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer
 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian
 
2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" style="color:purple;text-decoration:underline" target="_blank">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" style="color:purple;text-decoration:underline" target="_blank">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele


Reply | Threaded
Open this post in threaded view
|

Re: output writer

Michele Bertoni
Thanks! your answer is really helpful

actually I was just reading the FileOutputFormat and my idea was to extend it and use the open function to open multiple streams
so it should be a mix of 1 and 4

but i have some questions:

what is a good number of open files at the same time? (i mean, the size of the LRU in each node)

and if I create the LRU in the fileoutputstream, how many of them will be created? one for each ‘degree of parallelism’ right?


thanks
michele


Il giorno 08/set/2015, alle ore 16:49, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

you need to directly use a FileSystem client (e.g., Hadoop's) to create and write to files. Have a look at the FileOutputFormat [1] which does this for a single file per operator instance / partition. Instead of creating a single file, you need to create one file for each key. However, you want to avoid to have too many files open at a time but also avoid to create too many files containing only a few records. If you use HDFS, this is especially important, because HDFS is bad at handling many small files. Only recent versions of HDFS support appending to files. If you have an older version you have to create a new file for a key if you do not have an open file handle for it.

There are multiple ways to control the number of open files and reduce the number of files:

1) You can partition the data (as you already suggested) to move all records with the same key to the same operator.
2) If you use the batch DataSet API you can sort the data using sortPartition() such that each operator instance has only one file open at a time.
3) Instead of doing a full sort, you could also use combineGroup() to partially sort the data
4) Have a pool of open file handles and an LRU kind of eviction policy to decide which file to close whenever you need open a new one.

Implementing this is not trivial. You can also organize the files per key in folders. Have a look at the InitializeOnMaster and FinalizeOnMaster hooks which are called once before a job is started and after all instance of a task finished.

Let me know, if you need more information or if something is not clear.

Cheers, Fabian

2015-09-08 12:33 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi guys,
sorry for late answer but I am still working to get this done but I don’t understand something

I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that

at first I think I should at least partition my data according to the output key (each key to one file)
then I need to name the file exactly with that key
but I don’t know how to go on

thanks
michele



Il giorno 30/lug/2015, alle ore 12:53, Radu Tudoran <[hidden email]> ha scritto:

Re-hi,
 
I have double –checked and actually there is an OutputFormat interface in flink which can be extended.
I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.
On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)
 
 
 
Dr. Radu Tudoran
Research Engineer
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
 
E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="&#43;4915209084330" target="_blank" class=""> +49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="&#43;49891588344173" target="_blank" class=""> +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
 
From: Fabian Hueske [[hidden email]] 
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer
 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian
 
2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" style="color:purple;text-decoration:underline" target="_blank" class="">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" style="color:purple;text-decoration:underline" target="_blank" class="">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele



Reply | Threaded
Open this post in threaded view
|

Re: output writer

Fabian Hueske-2
I think you should not extend the FileOutputFormat but implement a completely new OutputFormat. You can of course copy some of the FileOutputFormat code to your new format.

Regarding the number of open files, I would make this a parameter. I guess you can have at least 64 files open per operator maybe even a lot more. If you can afford to partition and sort the data before, I would go with that solution because it will be much easier to implement (no LRU, just a single file at a time) and more robust.

When executing your program, you'll have one OutputFormat instance for each degree of parallelism, i.e., also that much LRUs spread over all machines. The number of LRU's per machine depends on the number of slots in your TaskManager configuration.

2015-09-08 17:39 GMT+02:00 Michele Bertoni <[hidden email]>:
Thanks! your answer is really helpful

actually I was just reading the FileOutputFormat and my idea was to extend it and use the open function to open multiple streams
so it should be a mix of 1 and 4

but i have some questions:

what is a good number of open files at the same time? (i mean, the size of the LRU in each node)

and if I create the LRU in the fileoutputstream, how many of them will be created? one for each ‘degree of parallelism’ right?


thanks
michele


Il giorno 08/set/2015, alle ore 16:49, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

you need to directly use a FileSystem client (e.g., Hadoop's) to create and write to files. Have a look at the FileOutputFormat [1] which does this for a single file per operator instance / partition. Instead of creating a single file, you need to create one file for each key. However, you want to avoid to have too many files open at a time but also avoid to create too many files containing only a few records. If you use HDFS, this is especially important, because HDFS is bad at handling many small files. Only recent versions of HDFS support appending to files. If you have an older version you have to create a new file for a key if you do not have an open file handle for it.

There are multiple ways to control the number of open files and reduce the number of files:

1) You can partition the data (as you already suggested) to move all records with the same key to the same operator.
2) If you use the batch DataSet API you can sort the data using sortPartition() such that each operator instance has only one file open at a time.
3) Instead of doing a full sort, you could also use combineGroup() to partially sort the data
4) Have a pool of open file handles and an LRU kind of eviction policy to decide which file to close whenever you need open a new one.

Implementing this is not trivial. You can also organize the files per key in folders. Have a look at the InitializeOnMaster and FinalizeOnMaster hooks which are called once before a job is started and after all instance of a task finished.

Let me know, if you need more information or if something is not clear.

Cheers, Fabian

2015-09-08 12:33 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi guys,
sorry for late answer but I am still working to get this done but I don’t understand something

I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that

at first I think I should at least partition my data according to the output key (each key to one file)
then I need to name the file exactly with that key
but I don’t know how to go on

thanks
michele



Il giorno 30/lug/2015, alle ore 12:53, Radu Tudoran <[hidden email]> ha scritto:

Re-hi,
 
I have double –checked and actually there is an OutputFormat interface in flink which can be extended.
I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.
On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)
 
 
 
Dr. Radu Tudoran
Research Engineer
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
 
E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330" target="_blank"> +49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173" target="_blank"> +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
 
From: Fabian Hueske [[hidden email]] 
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer
 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian
 
2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" style="color:purple;text-decoration:underline" target="_blank">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" style="color:purple;text-decoration:underline" target="_blank">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele




Reply | Threaded
Open this post in threaded view
|

Re: output writer

Michele Bertoni
ok I got -some of- the points :)

I will do some tests and let you know

what scares me in using the sort is that in our program we may sort data before output them

if we don’t sort no problem at all

but if we sort then:
in one case sorting is done inside the group of the key (i.e. one sorted set for each output key: groupby(field_1).sort(field_2) )
in a second case we have subsets (i.e. groupby(field_1, field_2).sort(field_3) ) is the order preserved in this cases?




Il giorno 08/set/2015, alle ore 17:55, Fabian Hueske <[hidden email]> ha scritto:

I think you should not extend the FileOutputFormat but implement a completely new OutputFormat. You can of course copy some of the FileOutputFormat code to your new format.

Regarding the number of open files, I would make this a parameter. I guess you can have at least 64 files open per operator maybe even a lot more. If you can afford to partition and sort the data before, I would go with that solution because it will be much easier to implement (no LRU, just a single file at a time) and more robust.

When executing your program, you'll have one OutputFormat instance for each degree of parallelism, i.e., also that much LRUs spread over all machines. The number of LRU's per machine depends on the number of slots in your TaskManager configuration.

2015-09-08 17:39 GMT+02:00 Michele Bertoni <[hidden email]>:
Thanks! your answer is really helpful

actually I was just reading the FileOutputFormat and my idea was to extend it and use the open function to open multiple streams
so it should be a mix of 1 and 4

but i have some questions:

what is a good number of open files at the same time? (i mean, the size of the LRU in each node)

and if I create the LRU in the fileoutputstream, how many of them will be created? one for each ‘degree of parallelism’ right?


thanks
michele


Il giorno 08/set/2015, alle ore 16:49, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

you need to directly use a FileSystem client (e.g., Hadoop's) to create and write to files. Have a look at the FileOutputFormat [1] which does this for a single file per operator instance / partition. Instead of creating a single file, you need to create one file for each key. However, you want to avoid to have too many files open at a time but also avoid to create too many files containing only a few records. If you use HDFS, this is especially important, because HDFS is bad at handling many small files. Only recent versions of HDFS support appending to files. If you have an older version you have to create a new file for a key if you do not have an open file handle for it.

There are multiple ways to control the number of open files and reduce the number of files:

1) You can partition the data (as you already suggested) to move all records with the same key to the same operator.
2) If you use the batch DataSet API you can sort the data using sortPartition() such that each operator instance has only one file open at a time.
3) Instead of doing a full sort, you could also use combineGroup() to partially sort the data
4) Have a pool of open file handles and an LRU kind of eviction policy to decide which file to close whenever you need open a new one.

Implementing this is not trivial. You can also organize the files per key in folders. Have a look at the InitializeOnMaster and FinalizeOnMaster hooks which are called once before a job is started and after all instance of a task finished.

Let me know, if you need more information or if something is not clear.

Cheers, Fabian

2015-09-08 12:33 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi guys,
sorry for late answer but I am still working to get this done but I don’t understand something

I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that

at first I think I should at least partition my data according to the output key (each key to one file)
then I need to name the file exactly with that key
but I don’t know how to go on

thanks
michele



Il giorno 30/lug/2015, alle ore 12:53, Radu Tudoran <[hidden email]> ha scritto:

Re-hi,
 
I have double –checked and actually there is an OutputFormat interface in flink which can be extended.
I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.
On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)
 
 
 
Dr. Radu Tudoran
Research Engineer
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
 
E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="&#43;4915209084330" target="_blank" class=""> +49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="&#43;49891588344173" target="_blank" class=""> +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
 
From: Fabian Hueske [[hidden email]] 
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer
 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian
 
2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" style="color:purple;text-decoration:underline" target="_blank" class="">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" style="color:purple;text-decoration:underline" target="_blank" class="">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele





Reply | Threaded
Open this post in threaded view
|

Re: output writer

Fabian Hueske-2
I did not fully understand you last question, but I'll try to answer.

If you do a
myData.groupBy(key1, key2).sortGroup(key3).reduceGroup(myReduceFunction);
Flink will do the grouping and sorting in a single sort over three fields. So the result will be sorted on key1, key2, and key3 (given that your GroupReduce function does not change the values of key1, key2, and key3).
However, be aware, that Flink might change the order of key1 and key2 (only grouping is required) and your data is partitioned on key1 AND key2, i.e., identical key2 values might be spread over all partitions.



2015-09-08 18:18 GMT+02:00 Michele Bertoni <[hidden email]>:
ok I got -some of- the points :)

I will do some tests and let you know

what scares me in using the sort is that in our program we may sort data before output them

if we don’t sort no problem at all

but if we sort then:
in one case sorting is done inside the group of the key (i.e. one sorted set for each output key: groupby(field_1).sort(field_2) )
in a second case we have subsets (i.e. groupby(field_1, field_2).sort(field_3) ) is the order preserved in this cases?




Il giorno 08/set/2015, alle ore 17:55, Fabian Hueske <[hidden email]> ha scritto:

I think you should not extend the FileOutputFormat but implement a completely new OutputFormat. You can of course copy some of the FileOutputFormat code to your new format.

Regarding the number of open files, I would make this a parameter. I guess you can have at least 64 files open per operator maybe even a lot more. If you can afford to partition and sort the data before, I would go with that solution because it will be much easier to implement (no LRU, just a single file at a time) and more robust.

When executing your program, you'll have one OutputFormat instance for each degree of parallelism, i.e., also that much LRUs spread over all machines. The number of LRU's per machine depends on the number of slots in your TaskManager configuration.

2015-09-08 17:39 GMT+02:00 Michele Bertoni <[hidden email]>:
Thanks! your answer is really helpful

actually I was just reading the FileOutputFormat and my idea was to extend it and use the open function to open multiple streams
so it should be a mix of 1 and 4

but i have some questions:

what is a good number of open files at the same time? (i mean, the size of the LRU in each node)

and if I create the LRU in the fileoutputstream, how many of them will be created? one for each ‘degree of parallelism’ right?


thanks
michele


Il giorno 08/set/2015, alle ore 16:49, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

you need to directly use a FileSystem client (e.g., Hadoop's) to create and write to files. Have a look at the FileOutputFormat [1] which does this for a single file per operator instance / partition. Instead of creating a single file, you need to create one file for each key. However, you want to avoid to have too many files open at a time but also avoid to create too many files containing only a few records. If you use HDFS, this is especially important, because HDFS is bad at handling many small files. Only recent versions of HDFS support appending to files. If you have an older version you have to create a new file for a key if you do not have an open file handle for it.

There are multiple ways to control the number of open files and reduce the number of files:

1) You can partition the data (as you already suggested) to move all records with the same key to the same operator.
2) If you use the batch DataSet API you can sort the data using sortPartition() such that each operator instance has only one file open at a time.
3) Instead of doing a full sort, you could also use combineGroup() to partially sort the data
4) Have a pool of open file handles and an LRU kind of eviction policy to decide which file to close whenever you need open a new one.

Implementing this is not trivial. You can also organize the files per key in folders. Have a look at the InitializeOnMaster and FinalizeOnMaster hooks which are called once before a job is started and after all instance of a task finished.

Let me know, if you need more information or if something is not clear.

Cheers, Fabian

2015-09-08 12:33 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi guys,
sorry for late answer but I am still working to get this done but I don’t understand something

I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that

at first I think I should at least partition my data according to the output key (each key to one file)
then I need to name the file exactly with that key
but I don’t know how to go on

thanks
michele



Il giorno 30/lug/2015, alle ore 12:53, Radu Tudoran <[hidden email]> ha scritto:

Re-hi,
 
I have double –checked and actually there is an OutputFormat interface in flink which can be extended.
I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.
On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)
 
 
 
Dr. Radu Tudoran
Research Engineer
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
 
E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330" target="_blank"> +49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173" target="_blank"> +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
 
From: Fabian Hueske [[hidden email]] 
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer
 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian
 
2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" style="color:purple;text-decoration:underline" target="_blank">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" style="color:purple;text-decoration:underline" target="_blank">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele






Reply | Threaded
Open this post in threaded view
|

Re: output writer

Michele Bertoni
yes you understood it right!

but then, after that block, how can I partition data according to key1 (the output key) and save the order of key3? if it is possible


Il giorno 08/set/2015, alle ore 18:39, Fabian Hueske <[hidden email]> ha scritto:

I did not fully understand you last question, but I'll try to answer.

If you do a
myData.groupBy(key1, key2).sortGroup(key3).reduceGroup(myReduceFunction);
Flink will do the grouping and sorting in a single sort over three fields. So the result will be sorted on key1, key2, and key3 (given that your GroupReduce function does not change the values of key1, key2, and key3).
However, be aware, that Flink might change the order of key1 and key2 (only grouping is required) and your data is partitioned on key1 AND key2, i.e., identical key2 values might be spread over all partitions.



2015-09-08 18:18 GMT+02:00 Michele Bertoni <[hidden email]>:
ok I got -some of- the points :)

I will do some tests and let you know

what scares me in using the sort is that in our program we may sort data before output them

if we don’t sort no problem at all

but if we sort then:
in one case sorting is done inside the group of the key (i.e. one sorted set for each output key: groupby(field_1).sort(field_2) )
in a second case we have subsets (i.e. groupby(field_1, field_2).sort(field_3) ) is the order preserved in this cases?




Il giorno 08/set/2015, alle ore 17:55, Fabian Hueske <[hidden email]> ha scritto:

I think you should not extend the FileOutputFormat but implement a completely new OutputFormat. You can of course copy some of the FileOutputFormat code to your new format.

Regarding the number of open files, I would make this a parameter. I guess you can have at least 64 files open per operator maybe even a lot more. If you can afford to partition and sort the data before, I would go with that solution because it will be much easier to implement (no LRU, just a single file at a time) and more robust.

When executing your program, you'll have one OutputFormat instance for each degree of parallelism, i.e., also that much LRUs spread over all machines. The number of LRU's per machine depends on the number of slots in your TaskManager configuration.

2015-09-08 17:39 GMT+02:00 Michele Bertoni <[hidden email]>:
Thanks! your answer is really helpful

actually I was just reading the FileOutputFormat and my idea was to extend it and use the open function to open multiple streams
so it should be a mix of 1 and 4

but i have some questions:

what is a good number of open files at the same time? (i mean, the size of the LRU in each node)

and if I create the LRU in the fileoutputstream, how many of them will be created? one for each ‘degree of parallelism’ right?


thanks
michele


Il giorno 08/set/2015, alle ore 16:49, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

you need to directly use a FileSystem client (e.g., Hadoop's) to create and write to files. Have a look at the FileOutputFormat [1] which does this for a single file per operator instance / partition. Instead of creating a single file, you need to create one file for each key. However, you want to avoid to have too many files open at a time but also avoid to create too many files containing only a few records. If you use HDFS, this is especially important, because HDFS is bad at handling many small files. Only recent versions of HDFS support appending to files. If you have an older version you have to create a new file for a key if you do not have an open file handle for it.

There are multiple ways to control the number of open files and reduce the number of files:

1) You can partition the data (as you already suggested) to move all records with the same key to the same operator.
2) If you use the batch DataSet API you can sort the data using sortPartition() such that each operator instance has only one file open at a time.
3) Instead of doing a full sort, you could also use combineGroup() to partially sort the data
4) Have a pool of open file handles and an LRU kind of eviction policy to decide which file to close whenever you need open a new one.

Implementing this is not trivial. You can also organize the files per key in folders. Have a look at the InitializeOnMaster and FinalizeOnMaster hooks which are called once before a job is started and after all instance of a task finished.

Let me know, if you need more information or if something is not clear.

Cheers, Fabian

2015-09-08 12:33 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi guys,
sorry for late answer but I am still working to get this done but I don’t understand something

I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that

at first I think I should at least partition my data according to the output key (each key to one file)
then I need to name the file exactly with that key
but I don’t know how to go on

thanks
michele



Il giorno 30/lug/2015, alle ore 12:53, Radu Tudoran <[hidden email]> ha scritto:

Re-hi,
 
I have double –checked and actually there is an OutputFormat interface in flink which can be extended.
I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.
On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)
 
 
 
Dr. Radu Tudoran
Research Engineer
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
 
E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="&#43;4915209084330" target="_blank" class=""> +49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="&#43;49891588344173" target="_blank" class=""> +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
 
From: Fabian Hueske [[hidden email]] 
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer
 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian
 
2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" style="color:purple;text-decoration:underline" target="_blank" class="">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" style="color:purple;text-decoration:underline" target="_blank" class="">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele







Reply | Threaded
Open this post in threaded view
|

Re: output writer

Fabian Hueske-2
For your use case is would make more sense to partition and sort the data on the same key on which you want to partition the output files, i.e., partitioning on key1 and sorting on key3 might not help a lot.

Any order is destroyed if you have to partition the data.
What you can try to do is to enforce a certain partitioning before the groupBy which is reused for the grouping and reducing.

myData.partitionByHash(key1).groupBy(key1, key2).sortGroup(key3).reduceGroup(myReduceFunction);

If the GroupReduce function preserves key1, key2, and key3, your data should be partitioned on key1 and sorted on key1,key2,key3. Hence, you could use it directly to partition your output files on key1.

2015-09-08 18:44 GMT+02:00 Michele Bertoni <[hidden email]>:
yes you understood it right!

but then, after that block, how can I partition data according to key1 (the output key) and save the order of key3? if it is possible


Il giorno 08/set/2015, alle ore 18:39, Fabian Hueske <[hidden email]> ha scritto:

I did not fully understand you last question, but I'll try to answer.

If you do a
myData.groupBy(key1, key2).sortGroup(key3).reduceGroup(myReduceFunction);
Flink will do the grouping and sorting in a single sort over three fields. So the result will be sorted on key1, key2, and key3 (given that your GroupReduce function does not change the values of key1, key2, and key3).
However, be aware, that Flink might change the order of key1 and key2 (only grouping is required) and your data is partitioned on key1 AND key2, i.e., identical key2 values might be spread over all partitions.



2015-09-08 18:18 GMT+02:00 Michele Bertoni <[hidden email]>:
ok I got -some of- the points :)

I will do some tests and let you know

what scares me in using the sort is that in our program we may sort data before output them

if we don’t sort no problem at all

but if we sort then:
in one case sorting is done inside the group of the key (i.e. one sorted set for each output key: groupby(field_1).sort(field_2) )
in a second case we have subsets (i.e. groupby(field_1, field_2).sort(field_3) ) is the order preserved in this cases?




Il giorno 08/set/2015, alle ore 17:55, Fabian Hueske <[hidden email]> ha scritto:

I think you should not extend the FileOutputFormat but implement a completely new OutputFormat. You can of course copy some of the FileOutputFormat code to your new format.

Regarding the number of open files, I would make this a parameter. I guess you can have at least 64 files open per operator maybe even a lot more. If you can afford to partition and sort the data before, I would go with that solution because it will be much easier to implement (no LRU, just a single file at a time) and more robust.

When executing your program, you'll have one OutputFormat instance for each degree of parallelism, i.e., also that much LRUs spread over all machines. The number of LRU's per machine depends on the number of slots in your TaskManager configuration.

2015-09-08 17:39 GMT+02:00 Michele Bertoni <[hidden email]>:
Thanks! your answer is really helpful

actually I was just reading the FileOutputFormat and my idea was to extend it and use the open function to open multiple streams
so it should be a mix of 1 and 4

but i have some questions:

what is a good number of open files at the same time? (i mean, the size of the LRU in each node)

and if I create the LRU in the fileoutputstream, how many of them will be created? one for each ‘degree of parallelism’ right?


thanks
michele


Il giorno 08/set/2015, alle ore 16:49, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

you need to directly use a FileSystem client (e.g., Hadoop's) to create and write to files. Have a look at the FileOutputFormat [1] which does this for a single file per operator instance / partition. Instead of creating a single file, you need to create one file for each key. However, you want to avoid to have too many files open at a time but also avoid to create too many files containing only a few records. If you use HDFS, this is especially important, because HDFS is bad at handling many small files. Only recent versions of HDFS support appending to files. If you have an older version you have to create a new file for a key if you do not have an open file handle for it.

There are multiple ways to control the number of open files and reduce the number of files:

1) You can partition the data (as you already suggested) to move all records with the same key to the same operator.
2) If you use the batch DataSet API you can sort the data using sortPartition() such that each operator instance has only one file open at a time.
3) Instead of doing a full sort, you could also use combineGroup() to partially sort the data
4) Have a pool of open file handles and an LRU kind of eviction policy to decide which file to close whenever you need open a new one.

Implementing this is not trivial. You can also organize the files per key in folders. Have a look at the InitializeOnMaster and FinalizeOnMaster hooks which are called once before a job is started and after all instance of a task finished.

Let me know, if you need more information or if something is not clear.

Cheers, Fabian

2015-09-08 12:33 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi guys,
sorry for late answer but I am still working to get this done but I don’t understand something

I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that

at first I think I should at least partition my data according to the output key (each key to one file)
then I need to name the file exactly with that key
but I don’t know how to go on

thanks
michele



Il giorno 30/lug/2015, alle ore 12:53, Radu Tudoran <[hidden email]> ha scritto:

Re-hi,
 
I have double –checked and actually there is an OutputFormat interface in flink which can be extended.
I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.
On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)
 
 
 
Dr. Radu Tudoran
Research Engineer
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
 
E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330" target="_blank"> +49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173" target="_blank"> +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
 
From: Fabian Hueske [[hidden email]] 
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer
 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian
 
2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" style="color:purple;text-decoration:underline" target="_blank">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" style="color:purple;text-decoration:underline" target="_blank">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele








Reply | Threaded
Open this post in threaded view
|

Re: output writer

Michele Bertoni
Hi, thanks Fabian,
this night I got rid of that problem in an other way (I convinced my professor to add an “order" attribute, so the actual position in the output is useless)

now what I am going to do is this (if I understood it correctly from you yesterday):

at first
ds.groupBy(0).reduceGroup((v, out : Collector[GenomicRegionType]) => while(v.hasNext) out.collect(v.next))


then in my custom output writer, write record checks whether the current stream is the right one
if it is then write
otherwise close it and open a new one

since they are grouped, only one stream per slot will be open (I always use degree of parallelism at the highest in this step) and it will be opened only once (no append)


is it right?



thanks a lot
michele



Il giorno 09/set/2015, alle ore 10:06, Fabian Hueske <[hidden email]> ha scritto:

For your use case is would make more sense to partition and sort the data on the same key on which you want to partition the output files, i.e., partitioning on key1 and sorting on key3 might not help a lot.

Any order is destroyed if you have to partition the data.
What you can try to do is to enforce a certain partitioning before the groupBy which is reused for the grouping and reducing.

myData.partitionByHash(key1).groupBy(key1, key2).sortGroup(key3).reduceGroup(myReduceFunction);

If the GroupReduce function preserves key1, key2, and key3, your data should be partitioned on key1 and sorted on key1,key2,key3. Hence, you could use it directly to partition your output files on key1.

2015-09-08 18:44 GMT+02:00 Michele Bertoni <[hidden email]>:
yes you understood it right!

but then, after that block, how can I partition data according to key1 (the output key) and save the order of key3? if it is possible


Il giorno 08/set/2015, alle ore 18:39, Fabian Hueske <[hidden email]> ha scritto:

I did not fully understand you last question, but I'll try to answer.

If you do a
myData.groupBy(key1, key2).sortGroup(key3).reduceGroup(myReduceFunction);
Flink will do the grouping and sorting in a single sort over three fields. So the result will be sorted on key1, key2, and key3 (given that your GroupReduce function does not change the values of key1, key2, and key3).
However, be aware, that Flink might change the order of key1 and key2 (only grouping is required) and your data is partitioned on key1 AND key2, i.e., identical key2 values might be spread over all partitions.



2015-09-08 18:18 GMT+02:00 Michele Bertoni <[hidden email]>:
ok I got -some of- the points :)

I will do some tests and let you know

what scares me in using the sort is that in our program we may sort data before output them

if we don’t sort no problem at all

but if we sort then:
in one case sorting is done inside the group of the key (i.e. one sorted set for each output key: groupby(field_1).sort(field_2) )
in a second case we have subsets (i.e. groupby(field_1, field_2).sort(field_3) ) is the order preserved in this cases?




Il giorno 08/set/2015, alle ore 17:55, Fabian Hueske <[hidden email]> ha scritto:

I think you should not extend the FileOutputFormat but implement a completely new OutputFormat. You can of course copy some of the FileOutputFormat code to your new format.

Regarding the number of open files, I would make this a parameter. I guess you can have at least 64 files open per operator maybe even a lot more. If you can afford to partition and sort the data before, I would go with that solution because it will be much easier to implement (no LRU, just a single file at a time) and more robust.

When executing your program, you'll have one OutputFormat instance for each degree of parallelism, i.e., also that much LRUs spread over all machines. The number of LRU's per machine depends on the number of slots in your TaskManager configuration.

2015-09-08 17:39 GMT+02:00 Michele Bertoni <[hidden email]>:
Thanks! your answer is really helpful

actually I was just reading the FileOutputFormat and my idea was to extend it and use the open function to open multiple streams
so it should be a mix of 1 and 4

but i have some questions:

what is a good number of open files at the same time? (i mean, the size of the LRU in each node)

and if I create the LRU in the fileoutputstream, how many of them will be created? one for each ‘degree of parallelism’ right?


thanks
michele


Il giorno 08/set/2015, alle ore 16:49, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

you need to directly use a FileSystem client (e.g., Hadoop's) to create and write to files. Have a look at the FileOutputFormat [1] which does this for a single file per operator instance / partition. Instead of creating a single file, you need to create one file for each key. However, you want to avoid to have too many files open at a time but also avoid to create too many files containing only a few records. If you use HDFS, this is especially important, because HDFS is bad at handling many small files. Only recent versions of HDFS support appending to files. If you have an older version you have to create a new file for a key if you do not have an open file handle for it.

There are multiple ways to control the number of open files and reduce the number of files:

1) You can partition the data (as you already suggested) to move all records with the same key to the same operator.
2) If you use the batch DataSet API you can sort the data using sortPartition() such that each operator instance has only one file open at a time.
3) Instead of doing a full sort, you could also use combineGroup() to partially sort the data
4) Have a pool of open file handles and an LRU kind of eviction policy to decide which file to close whenever you need open a new one.

Implementing this is not trivial. You can also organize the files per key in folders. Have a look at the InitializeOnMaster and FinalizeOnMaster hooks which are called once before a job is started and after all instance of a task finished.

Let me know, if you need more information or if something is not clear.

Cheers, Fabian

2015-09-08 12:33 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi guys,
sorry for late answer but I am still working to get this done but I don’t understand something

I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that

at first I think I should at least partition my data according to the output key (each key to one file)
then I need to name the file exactly with that key
but I don’t know how to go on

thanks
michele



Il giorno 30/lug/2015, alle ore 12:53, Radu Tudoran <[hidden email]> ha scritto:

Re-hi,
 
I have double –checked and actually there is an OutputFormat interface in flink which can be extended.
I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.
On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)
 
 
 
Dr. Radu Tudoran
Research Engineer
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
 
E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="&#43;4915209084330" target="_blank" class=""> +49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="&#43;49891588344173" target="_blank" class=""> +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
 
From: Fabian Hueske [[hidden email]] 
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer
 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian
 
2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" style="color:purple;text-decoration:underline" target="_blank" class="">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" style="color:purple;text-decoration:underline" target="_blank" class="">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele









Reply | Threaded
Open this post in threaded view
|

Re: output writer

Fabian Hueske-2
Hi Michele,

If I see that correctly, you are using the groupBy and groupReduce to partition and group the data.
This does work, but you can do it even easier like this:

ds.partitionByHash(0).sortPartition(0, Order.ASCENDING).output(yourOF);

This will partition and sort the data on field 0 without going through a user-defined GroupReduce function.
As you said, your OutputFormat just needs to check for new keys and open a new file in that case.

Cheers, Fabian

2015-09-09 10:18 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi, thanks Fabian,
this night I got rid of that problem in an other way (I convinced my professor to add an “order" attribute, so the actual position in the output is useless)

now what I am going to do is this (if I understood it correctly from you yesterday):

at first
ds.groupBy(0).reduceGroup((v, out : Collector[GenomicRegionType]) => while(v.hasNext) out.collect(v.next))


then in my custom output writer, write record checks whether the current stream is the right one
if it is then write
otherwise close it and open a new one

since they are grouped, only one stream per slot will be open (I always use degree of parallelism at the highest in this step) and it will be opened only once (no append)


is it right?



thanks a lot
michele



Il giorno 09/set/2015, alle ore 10:06, Fabian Hueske <[hidden email]> ha scritto:

For your use case is would make more sense to partition and sort the data on the same key on which you want to partition the output files, i.e., partitioning on key1 and sorting on key3 might not help a lot.

Any order is destroyed if you have to partition the data.
What you can try to do is to enforce a certain partitioning before the groupBy which is reused for the grouping and reducing.

myData.partitionByHash(key1).groupBy(key1, key2).sortGroup(key3).reduceGroup(myReduceFunction);

If the GroupReduce function preserves key1, key2, and key3, your data should be partitioned on key1 and sorted on key1,key2,key3. Hence, you could use it directly to partition your output files on key1.

2015-09-08 18:44 GMT+02:00 Michele Bertoni <[hidden email]>:
yes you understood it right!

but then, after that block, how can I partition data according to key1 (the output key) and save the order of key3? if it is possible


Il giorno 08/set/2015, alle ore 18:39, Fabian Hueske <[hidden email]> ha scritto:

I did not fully understand you last question, but I'll try to answer.

If you do a
myData.groupBy(key1, key2).sortGroup(key3).reduceGroup(myReduceFunction);
Flink will do the grouping and sorting in a single sort over three fields. So the result will be sorted on key1, key2, and key3 (given that your GroupReduce function does not change the values of key1, key2, and key3).
However, be aware, that Flink might change the order of key1 and key2 (only grouping is required) and your data is partitioned on key1 AND key2, i.e., identical key2 values might be spread over all partitions.



2015-09-08 18:18 GMT+02:00 Michele Bertoni <[hidden email]>:
ok I got -some of- the points :)

I will do some tests and let you know

what scares me in using the sort is that in our program we may sort data before output them

if we don’t sort no problem at all

but if we sort then:
in one case sorting is done inside the group of the key (i.e. one sorted set for each output key: groupby(field_1).sort(field_2) )
in a second case we have subsets (i.e. groupby(field_1, field_2).sort(field_3) ) is the order preserved in this cases?




Il giorno 08/set/2015, alle ore 17:55, Fabian Hueske <[hidden email]> ha scritto:

I think you should not extend the FileOutputFormat but implement a completely new OutputFormat. You can of course copy some of the FileOutputFormat code to your new format.

Regarding the number of open files, I would make this a parameter. I guess you can have at least 64 files open per operator maybe even a lot more. If you can afford to partition and sort the data before, I would go with that solution because it will be much easier to implement (no LRU, just a single file at a time) and more robust.

When executing your program, you'll have one OutputFormat instance for each degree of parallelism, i.e., also that much LRUs spread over all machines. The number of LRU's per machine depends on the number of slots in your TaskManager configuration.

2015-09-08 17:39 GMT+02:00 Michele Bertoni <[hidden email]>:
Thanks! your answer is really helpful

actually I was just reading the FileOutputFormat and my idea was to extend it and use the open function to open multiple streams
so it should be a mix of 1 and 4

but i have some questions:

what is a good number of open files at the same time? (i mean, the size of the LRU in each node)

and if I create the LRU in the fileoutputstream, how many of them will be created? one for each ‘degree of parallelism’ right?


thanks
michele


Il giorno 08/set/2015, alle ore 16:49, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

you need to directly use a FileSystem client (e.g., Hadoop's) to create and write to files. Have a look at the FileOutputFormat [1] which does this for a single file per operator instance / partition. Instead of creating a single file, you need to create one file for each key. However, you want to avoid to have too many files open at a time but also avoid to create too many files containing only a few records. If you use HDFS, this is especially important, because HDFS is bad at handling many small files. Only recent versions of HDFS support appending to files. If you have an older version you have to create a new file for a key if you do not have an open file handle for it.

There are multiple ways to control the number of open files and reduce the number of files:

1) You can partition the data (as you already suggested) to move all records with the same key to the same operator.
2) If you use the batch DataSet API you can sort the data using sortPartition() such that each operator instance has only one file open at a time.
3) Instead of doing a full sort, you could also use combineGroup() to partially sort the data
4) Have a pool of open file handles and an LRU kind of eviction policy to decide which file to close whenever you need open a new one.

Implementing this is not trivial. You can also organize the files per key in folders. Have a look at the InitializeOnMaster and FinalizeOnMaster hooks which are called once before a job is started and after all instance of a task finished.

Let me know, if you need more information or if something is not clear.

Cheers, Fabian

2015-09-08 12:33 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi guys,
sorry for late answer but I am still working to get this done but I don’t understand something

I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that

at first I think I should at least partition my data according to the output key (each key to one file)
then I need to name the file exactly with that key
but I don’t know how to go on

thanks
michele



Il giorno 30/lug/2015, alle ore 12:53, Radu Tudoran <[hidden email]> ha scritto:

Re-hi,
 
I have double –checked and actually there is an OutputFormat interface in flink which can be extended.
I believe that for this kind of specific formats as mentioned by Michele, each can develop the appropriate format.
On the other hand, having more outputformats I believe is something that could be contributed. We should identify a couple of common formats. The first one that comes in my mind is to have something for writing to memory (e.g. memory buffer)
 
 
 
Dr. Radu Tudoran
Research Engineer
IT R&D Division
 
<image001.png>
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
 
E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330" target="_blank"> +49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173" target="_blank"> +49 891588344173
 
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
 
From: Fabian Hueske [[hidden email]] 
Sent: Thursday, July 30, 2015 11:34 AM
To: [hidden email]
Subject: Re: output writer
 

Hi Michele, hi Radu

Flink does not have such an OutputFormat, but I agree, it would be a valuable addition.

Radu's approach looks like the way to go to implement this feature.

@Radu, is there a way to contribute your OutputFormat to Flink?

Cheers, Fabian
 
2015-07-30 10:24 GMT+02:00 Radu Tudoran <[hidden email]>:
Hi,

My 2 cents ... based on something similar that I have tried.
I have created an own implementation for OutputFormat where you define your own logic for what happens in the "writerecord function". This logic would consist in making a distinction between the ids and write each to the appropriate file

Might be that other solutions exist


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" style="color:purple;text-decoration:underline" target="_blank">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" style="color:purple;text-decoration:underline" target="_blank">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-----Original Message-----
From: Michele Bertoni [mailto:[hidden email]]
Sent: Thursday, July 30, 2015 10:15 AM
To: [hidden email]
Subject: output writer

Hi everybody,
I have a question about the writer
I have to save my dataset in different files according to a field of the tuples

let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that?


thanks!
Michele