Re: BucketingSink capabilities for DataSet API
Posted by
anuj.aj07 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/BucketingSink-capabilities-for-DataSet-API-tp24107p32913.html
Thanks, Rafi. I will try with this but yes if partitioning is not possible then I also have to look some other solution.
Hi Anuj,
It's been a while since I wrote this (Flink 1.5.2). Could be a better/newer way, but this is what how I read & write Parquet with hadoop-compatibility:
// imports
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.parquet.avro.AvroParquetInputFormat;
// Creating Parquet input format
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
AvroParquetInputFormat<GenericRecord> parquetInputFormat = new AvroParquetInputFormat<>();
AvroParquetInputFormat.setInputDirRecursive(job, true);
AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
HadoopInputFormat<Void, GenericRecord> inputFormat = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class, GenericRecord.class, job);
// Creating Parquet output format
AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new AvroParquetOutputFormat<>();
AvroParquetOutputFormat.setSchema(job, new Schema.Parser().parse(SomeEvent.SCHEMA));
AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
AvroParquetOutputFormat.setCompressOutput(job, true);
AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
HadoopOutputFormat<Void, GenericRecord> outputFormat = new HadoopOutputFormat<>(parquetOutputFormat, job);
DataSource<Tuple2<Void, GenericRecord>> inputFileSource = env.createInput(inputFormat);
// Start processing...
// Writing result as Parquet
resultDataSet.output(outputFormat);
Regarding writing partitioned data, as far as I know, there is no way to achieve that with the DataSet API with hadoop-compatibility.
You could implement this with reading from input files as stream and then using StreamingFileSink with a custom BucketAssigner [1].
The problem with that (which was not yet resolved AFAIK) is described here [2] in "Important Notice 2".
Sadly I say, that eventually, for this use-case I chose Spark to do the job...
Hope this helps.
Rafi
Hi Rafi,
I have a similar use case where I want to read parquet files in the dataset and want to perform some transformation and similarly want to write the result using year month day partitioned.
I am stuck at first step only where how to read and write Parquet files using hadoop-Compatability.
Please help me with this and also if u find the solution for how to write data in partitioned.
Thanks,
Anuj
On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin <
[hidden email]> wrote:
Hi Rafi,
At the moment I do not see any support of Parquet in DataSet API except HadoopOutputFormat, mentioned in stack overflow question. I have cc’ed Fabian and Aljoscha, maybe they could provide more information.
Best,
Andrey
Hi,
I'm writing a Batch job which reads Parquet, does some aggregations and writes back as Parquet files.
I would like the output to be partitioned by year, month, day by event time. Similarly to the functionality of the BucketingSink.
I was able to achieve the reading/writing to/from Parquet by using the hadoop-compatibility features.
I couldn't find a way to partition the data by year, month, day to create a folder hierarchy accordingly. Everything is written to a single directory.
Can anyone suggest a way to achieve this? Maybe there's a way to integrate the BucketingSink with the DataSet API? Another solution?
--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07