Hello,
Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink? I'm looking to save output as CSV files partitioned by two columns(date and hour). The partitionBy dataset API is more to partition the data based on a column for further processing.
I'm thinking there is no direct API to do this. But what will be the best way of achieving this?
Srikanth
|
Hi Srikanth, DataSet.partitionBy() will partition the data on the declared partition fields. If you append a DataSink with the same parallelism as the partition operator, the data will be written out with the defined partitioning. 2016-02-12 20:53 GMT+01:00 Srikanth <[hidden email]>:
|
Fabian, Not sure if we are on the same page. If I do something like below code, it will groupby field 0 and each task will write a separate part file in parallel. val sink = data1.join(data2) .where(1).equalTo(0) { ((l,r) => ( l._3, r._3) ) } .partitionByHash(0) .writeAsCsv(pathBase + "output/test", rowDelimiter="\n", fieldDelimiter="\t" , WriteMode.OVERWRITE) This will create folder ./output/test/<1,2,3,4...> But what I was looking for is Hive style partitionBy that will output with folder structure ./output/field0=1/file ./output/field0=2/file ./output/field0=3/file ./output/field0=4/file Assuming field0 is Int and has unique values 1,2,3&4. Srikanth On Mon, Feb 15, 2016 at 6:20 AM, Fabian Hueske <[hidden email]> wrote:
|
Yes, you're right. I did not understand your question correctly. Right now, Flink does not feature an output format that writes records to output files depending on a key attribute.You would need to implement such an output format yourself and append it as follows: val data = ... data.partitionByHash(0) // partition to send all records with the same key to the same machine .output(new YourOutputFormat()) In case of many distinct keys, you would need to limit the number of open file handles. The OF will be easier to implement, if you do a sortPartition(0, Order.ASCENDING) before the output format to sort the data by key. Cheers, Fabian 2016-02-16 19:52 GMT+01:00 Srikanth <[hidden email]>:
|
Is there any plans to implement this kind of feature (possibility to write to data specified partitions) in the near future?
|
Hi Kirsti, I'm not aware of anybody working on this issue.2016-05-23 16:56 GMT+02:00 KirstiLaurila <[hidden email]>: Is there any plans to implement this kind of feature (possibility to write to |
Yeah, created this one https://issues.apache.org/jira/browse/FLINK-3961
|
Isn't this related to -- https://issues.apache.org/jira/browse/FLINK-2672 ?? This can be achieved with a RollingSink[1] & custom Bucketer probably. Srikanth On Tue, May 24, 2016 at 1:07 AM, KirstiLaurila <[hidden email]> wrote: Yeah, created this one https://issues.apache.org/jira/browse/FLINK-3961 |
RollingSink is part of Flink Streaming API. Can it be used in Flink Batch jobs, too?
As implied in FLINK-2672, RollingSink doesn't support dynamic bucket paths based on the tuple fields. The path must be given when creating the RollingSink instance, ie. before deploying the job. Yes, a custom Bucketer can be provided, but as the current method signature is, tuple is not passed to Bucketer. On Tue, May 24, 2016 at 4:45 PM, Srikanth <[hidden email]> wrote:
|
In reply to this post by Srikanth
Maybe, I don't know, but with streaming. How about batch?
|
Hi, the RollingSink can only be used with streaming. Adding support for dynamic paths based on element contents is certainly interesting. I imagine it can be tricky, though, to figure out when to close/flush the buckets. Cheers, Aljoscha On Wed, 25 May 2016 at 08:36 KirstiLaurila <[hidden email]> wrote: Maybe, I don't know, but with streaming. How about batch? |
Free forum by Nabble | Edit this page |