http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/BucketingSink-capabilities-for-DataSet-API-tp24107p32899.html
another option would be to use the new Hive connectors. Have you looked
> Hi Anuj,
>
> It's been a while since I wrote this (Flink 1.5.2). Could be a
> better/newer way, but this is what how I read & write Parquet with
> hadoop-compatibility:
>
> // imports
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
>
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
>
> import org.apache.flink.hadoopcompatibility.HadoopInputs;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.parquet.avro.AvroParquetInputFormat;
>
> // Creating Parquet input format
> Configuration conf = new Configuration();
> Job job = Job.getInstance(conf);
> AvroParquetInputFormat<GenericRecord> parquetInputFormat = new
> AvroParquetInputFormat<>();
> AvroParquetInputFormat.setInputDirRecursive(job, true);
> AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
> HadoopInputFormat<Void, GenericRecord> inputFormat
> = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
> GenericRecord.class, job);
>
> // Creating Parquet output format
> AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new
> AvroParquetOutputFormat<>();
> AvroParquetOutputFormat.setSchema(job, new
> Schema.Parser().parse(SomeEvent.SCHEMA));
> AvroParquetOutputFormat.setCompression(job,
> CompressionCodecName.SNAPPY);
> AvroParquetOutputFormat.setCompressOutput(job, true);
> AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
> HadoopOutputFormat<Void, GenericRecord> outputFormat = new
> HadoopOutputFormat<>(parquetOutputFormat, job);
>
> DataSource<Tuple2<Void, GenericRecord>> inputFileSource =
> env.createInput(inputFormat);
>
> // Start processing...
>
> // Writing result as Parquet
> resultDataSet.output(outputFormat);
>
>
> Regarding writing partitioned data, as far as I know, there is no way to
> achieve that with the DataSet API with hadoop-compatibility.
>
> You could implement this with reading from input files as stream and
> then using StreamingFileSink with a custom BucketAssigner [1].
> The problem with that (which was not yet resolved AFAIK) is described
> here [2] in "Important Notice 2".
>
> Sadly I say, that eventually, for this use-case I chose Spark to do the
> job...
>
> [1]
>
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html> [2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general>
> Hope this helps.
>
> Rafi
>
>
> On Sat, Feb 15, 2020 at 5:03 PM aj <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Hi Rafi,
>
> I have a similar use case where I want to read parquet files in the
> dataset and want to perform some transformation and similarly want
> to write the result using year month day partitioned.
>
> I am stuck at first step only where how to read and write
> Parquet files using hadoop-Compatability.
>
> Please help me with this and also if u find the solution for how to
> write data in partitioned.
>
> Thanks,
> Anuj
>
> On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin
> <
[hidden email] <mailto:
[hidden email]>> wrote:
>
> Hi Rafi,
>
> At the moment I do not see any support of Parquet in DataSet API
> except HadoopOutputFormat, mentioned in stack overflow question.
> I have cc’ed Fabian and Aljoscha, maybe they could provide more
> information.
>
> Best,
> Andrey
>
>> On 25 Oct 2018, at 13:08, Rafi Aroch <
[hidden email]
>> <mailto:
[hidden email]>> wrote:
>>
>> Hi,
>>
>> I'm writing a Batch job which reads Parquet, does some
>> aggregations and writes back as Parquet files.
>> I would like the output to be partitioned by year, month, day
>> by event time. Similarly to the functionality of the
>> BucketingSink.
>>
>> I was able to achieve the reading/writing to/from Parquet by
>> using the hadoop-compatibility features.
>> I couldn't find a way to partition the data by year, month,
>> day to create a folder hierarchy accordingly. Everything is
>> written to a single directory.
>>
>> I could find an unanswered question about this issue:
>>
https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit>>
>> Can anyone suggest a way to achieve this? Maybe there's a way
>> to integrate the BucketingSink with the DataSet API? Another
>> solution?
>>
>> Rafi
>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> ****<
http://www.oracle.com/>
>
>
> <
http://www.cse.iitm.ac.in/%7Eanujjain/>
>