Write each group to its own file

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Write each group to its own file

rlazoti
Hi,

Is there a way to write each group to its own file using the Dataset api
(Batch)?

For example, lets use the following class:

case class Product(name: String, category: String)

And the following Dataset:

val products = env.fromElements(Product("i7", "cpu"), Product("R5", "cpu"),
Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb",
"ssd"))

So in this example my output should be these 3 files:

- cpu.csv
i7, cpu
R5, cpu

- gpu.csv
gtx1080, gpu
vega64, gpu

- ssd.csv
evo250gb, ssd


I tried the following code, but got
org.apache.flink.api.common.InvalidProgramException: Task not serializable.

products.groupBy("category").reduceGroup { group: Iterator[Product] =>
  val items = group.toSeq
  env.fromElements(items).writeAsCsv(s"${items.head.category}.csv")
  items
}

I welcome any of your inputs.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Write each group to its own file

Piotr Nowojski
Hi,

There is no straightforward way to do that. First of all, the error you are getting is because you are trying to start new application ( env.fromElements(items) ) inside your reduce function.

To do what you want, you have to hash partition the products based on category (instead of grouping by and reducing) and after that either:

1. Sort the hash partitioned products and implement custom OutputFormat (maybe based on FileOutputFormat), that would start a new file when key value has changed.

Or

2. Implement custom OutputFormat (maybe based on FileOutputFormat), that would keep multiple opened files - one file per category - and write records accordingly.

Note that both options require first to hash partition the products. 1. Will be more CPU and memory consuming (have to sort the data), 2. Can exceed the maximum number of simultaneously opened file if number of categories is very high.

Piotrek

> On 11 Oct 2017, at 17:47, rlazoti <[hidden email]> wrote:
>
> Hi,
>
> Is there a way to write each group to its own file using the Dataset api
> (Batch)?
>
> For example, lets use the following class:
>
> case class Product(name: String, category: String)
>
> And the following Dataset:
>
> val products = env.fromElements(Product("i7", "cpu"), Product("R5", "cpu"),
> Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb",
> "ssd"))
>
> So in this example my output should be these 3 files:
>
> - cpu.csv
> i7, cpu
> R5, cpu
>
> - gpu.csv
> gtx1080, gpu
> vega64, gpu
>
> - ssd.csv
> evo250gb, ssd
>
>
> I tried the following code, but got
> org.apache.flink.api.common.InvalidProgramException: Task not serializable.
>
> products.groupBy("category").reduceGroup { group: Iterator[Product] =>
>  val items = group.toSeq
>  env.fromElements(items).writeAsCsv(s"${items.head.category}.csv")
>  items
> }
>
> I welcome any of your inputs.
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Write each group to its own file

rlazoti
Piotr,

I did as you suggested and it worked perfectly.
Thank you! :)

Best,
Rodrigo

On Thu, Oct 12, 2017 at 5:11 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

There is no straightforward way to do that. First of all, the error you are getting is because you are trying to start new application ( env.fromElements(items) ) inside your reduce function.

To do what you want, you have to hash partition the products based on category (instead of grouping by and reducing) and after that either:

1. Sort the hash partitioned products and implement custom OutputFormat (maybe based on FileOutputFormat), that would start a new file when key value has changed.

Or

2. Implement custom OutputFormat (maybe based on FileOutputFormat), that would keep multiple opened files - one file per category - and write records accordingly.

Note that both options require first to hash partition the products. 1. Will be more CPU and memory consuming (have to sort the data), 2. Can exceed the maximum number of simultaneously opened file if number of categories is very high.

Piotrek

> On 11 Oct 2017, at 17:47, rlazoti <[hidden email]> wrote:
>
> Hi,
>
> Is there a way to write each group to its own file using the Dataset api
> (Batch)?
>
> For example, lets use the following class:
>
> case class Product(name: String, category: String)
>
> And the following Dataset:
>
> val products = env.fromElements(Product("i7", "cpu"), Product("R5", "cpu"),
> Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb",
> "ssd"))
>
> So in this example my output should be these 3 files:
>
> - cpu.csv
> i7, cpu
> R5, cpu
>
> - gpu.csv
> gtx1080, gpu
> vega64, gpu
>
> - ssd.csv
> evo250gb, ssd
>
>
> I tried the following code, but got
> org.apache.flink.api.common.InvalidProgramException: Task not serializable.
>
> products.groupBy("category").reduceGroup { group: Iterator[Product] =>
>  val items = group.toSeq
>  env.fromElements(items).writeAsCsv(s"${items.head.category}.csv")
>  items
> }
>
> I welcome any of your inputs.
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Write each group to its own file

Piotr Nowojski
You’re welcome :)

On 23 Oct 2017, at 20:43, Rodrigo Lazoti <[hidden email]> wrote:

Piotr,

I did as you suggested and it worked perfectly.
Thank you! :)

Best,
Rodrigo

On Thu, Oct 12, 2017 at 5:11 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

There is no straightforward way to do that. First of all, the error you are getting is because you are trying to start new application ( env.fromElements(items) ) inside your reduce function.

To do what you want, you have to hash partition the products based on category (instead of grouping by and reducing) and after that either:

1. Sort the hash partitioned products and implement custom OutputFormat (maybe based on FileOutputFormat), that would start a new file when key value has changed.

Or

2. Implement custom OutputFormat (maybe based on FileOutputFormat), that would keep multiple opened files - one file per category - and write records accordingly.

Note that both options require first to hash partition the products. 1. Will be more CPU and memory consuming (have to sort the data), 2. Can exceed the maximum number of simultaneously opened file if number of categories is very high.

Piotrek

> On 11 Oct 2017, at 17:47, rlazoti <[hidden email]> wrote:
>
> Hi,
>
> Is there a way to write each group to its own file using the Dataset api
> (Batch)?
>
> For example, lets use the following class:
>
> case class Product(name: String, category: String)
>
> And the following Dataset:
>
> val products = env.fromElements(Product("i7", "cpu"), Product("R5", "cpu"),
> Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb",
> "ssd"))
>
> So in this example my output should be these 3 files:
>
> - cpu.csv
> i7, cpu
> R5, cpu
>
> - gpu.csv
> gtx1080, gpu
> vega64, gpu
>
> - ssd.csv
> evo250gb, ssd
>
>
> I tried the following code, but got
> org.apache.flink.api.common.InvalidProgramException: Task not serializable.
>
> products.groupBy("category").reduceGroup { group: Iterator[Product] =>
>  val items = group.toSeq
>  env.fromElements(items).writeAsCsv(s"${items.head.category}.csv")
>  items
> }
>
> I welcome any of your inputs.
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/