Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

Raja.Aravapalli

 

Hi,

 

I have a flink application that is streaming data into HDFS and I am using Bucketing Sink for that. And, I want to know if is it possible to rename the part files that is being created in the base hdfs directory.

 

Right now I am using the below code for including the timestamp into part-file name, but the problem I am facing is the timestamp is not changing for the new part file that is being rolled!

 

 

BucketingSink<String> HdfsSink = new BucketingSink<String> (hdfsOutputPath);

HdfsSink.setBucketer(new BasePathBucketer<String>());
HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 'hdfsOutputBatchSizeInMB' MB
HdfsSink.setPartPrefix("PART-FILE-" + Long.toString(System.currentTimeMillis()));

 

 

Can someone please suggest me, what code changes I can try so that I get a new timestamp for every part file that is being rolled new?

 

 

Thanks a lot.

 

Regards,

Raja.

Reply | Threaded
Open this post in threaded view
|

Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

Piotr Nowojski
Hi,

BucketingSink doesn’t support the feature that you are requesting, you can not specify a dynamically generated prefix/suffix.

Piotrek

On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli <[hidden email]> wrote:

 
Hi,
 
I have a flink application that is streaming data into HDFS and I am using Bucketing Sink for that. And, I want to know if is it possible to rename the part files that is being created in the base hdfs directory.
 
Right now I am using the below code for including the timestamp into part-file name, but the problem I am facing is the timestamp is not changing for the new part file that is being rolled!
 
 
BucketingSink<String> HdfsSink = new BucketingSink<String> (hdfsOutputPath);

HdfsSink.setBucketer(new BasePathBucketer<String>());
HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 'hdfsOutputBatchSizeInMB' MB
HdfsSink.setPartPrefix("PART-FILE-" + Long.toString(System.currentTimeMillis()));
 
 
Can someone please suggest me, what code changes I can try so that I get a new timestamp for every part file that is being rolled new?
 
 
Thanks a lot. 
 
Regards,
Raja.

Reply | Threaded
Open this post in threaded view
|

Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

Aljoscha Krettek
Hi Raja,

I think you can in fact do this by implementing a custom Bucketer. You can have a look at BasePathBucketer and extend that to include the timestamp in the path that is returned. You should probably clamp the timestamp so that you don't get a new path for every millisecond.

Best,
Aljoscha

On 1. Sep 2017, at 08:18, Piotr Nowojski <[hidden email]> wrote:

Hi,

BucketingSink doesn’t support the feature that you are requesting, you can not specify a dynamically generated prefix/suffix.

Piotrek

On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli <[hidden email]> wrote:

 
Hi,
 
I have a flink application that is streaming data into HDFS and I am using Bucketing Sink for that. And, I want to know if is it possible to rename the part files that is being created in the base hdfs directory.
 
Right now I am using the below code for including the timestamp into part-file name, but the problem I am facing is the timestamp is not changing for the new part file that is being rolled!
 
 
BucketingSink<String> HdfsSink = new BucketingSink<String> (hdfsOutputPath);

HdfsSink.setBucketer(new BasePathBucketer<String>());
HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 'hdfsOutputBatchSizeInMB' MB
HdfsSink.setPartPrefix("PART-FILE-" + Long.toString(System.currentTimeMillis()));
 
 
Can someone please suggest me, what code changes I can try so that I get a new timestamp for every part file that is being rolled new?
 
 
Thanks a lot. 
 
Regards,
Raja.


Reply | Threaded
Open this post in threaded view
|

Re: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

Raja.Aravapalli

 

Thanks Aljoscha for the inputs.

 

I will check to extend “BasePathBucketer” class.

 

 

Regards,

Raja.

 

From: Aljoscha Krettek <[hidden email]>
Date: Friday, September 1, 2017 at 10:27 AM
To: Piotr Nowojski <[hidden email]>
Cc: Raja Aravapalli <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

 

Hi Raja,

 

I think you can in fact do this by implementing a custom Bucketer. You can have a look at BasePathBucketer and extend that to include the timestamp in the path that is returned. You should probably clamp the timestamp so that you don't get a new path for every millisecond.

 

Best,

Aljoscha

 

On 1. Sep 2017, at 08:18, Piotr Nowojski <[hidden email]> wrote:

 

Hi,

 

BucketingSink doesn’t support the feature that you are requesting, you can not specify a dynamically generated prefix/suffix.

 

Piotrek

 

On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli <[hidden email]> wrote:

 

 

Hi,

 

I have a flink application that is streaming data into HDFS and I am using Bucketing Sink for that. And, I want to know if is it possible to rename the part files that is being created in the base hdfs directory.

 

Right now I am using the below code for including the timestamp into part-file name, but the problem I am facing is the timestamp is not changing for the new part file that is being rolled!

 

 

BucketingSink<String> HdfsSink = new BucketingSink<String> (hdfsOutputPath);

HdfsSink.setBucketer(new BasePathBucketer<String>());
HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 'hdfsOutputBatchSizeInMB' MB
HdfsSink.setPartPrefix("PART-FILE-" + Long.toString(System.currentTimeMillis()));

 

 

Can someone please suggest me, what code changes I can try so that I get a new timestamp for every part file that is being rolled new?

 

 

Thanks a lot. 

 

Regards,

Raja.

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

Felix Cheung
Yap I was able to get this to work with a custom bucketer.

A custom bucketer can use the clock given ("processing time") or it can use a timestamp from the data ("event time") for the bucketing path.


From: Raja.Aravapalli <[hidden email]>
Sent: Friday, September 1, 2017 10:21:00 AM
To: Aljoscha Krettek; Piotr Nowojski
Cc: [hidden email]
Subject: Re: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled
 

 

Thanks Aljoscha for the inputs.

 

I will check to extend “BasePathBucketer” class.

 

 

Regards,

Raja.

 

From: Aljoscha Krettek <[hidden email]>
Date: Friday, September 1, 2017 at 10:27 AM
To: Piotr Nowojski <[hidden email]>
Cc: Raja Aravapalli <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

 

Hi Raja,

 

I think you can in fact do this by implementing a custom Bucketer. You can have a look at BasePathBucketer and extend that to include the timestamp in the path that is returned. You should probably clamp the timestamp so that you don't get a new path for every millisecond.

 

Best,

Aljoscha

 

On 1. Sep 2017, at 08:18, Piotr Nowojski <[hidden email]> wrote:

 

Hi,

 

BucketingSink doesn’t support the feature that you are requesting, you can not specify a dynamically generated prefix/suffix.

 

Piotrek

 

On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli <[hidden email]> wrote:

 

 

Hi,

 

I have a flink application that is streaming data into HDFS and I am using Bucketing Sink for that. And, I want to know if is it possible to rename the part files that is being created in the base hdfs directory.

 

Right now I am using the below code for including the timestamp into part-file name, but the problem I am facing is the timestamp is not changing for the new part file that is being rolled!

 

 

BucketingSink<String> HdfsSink = new BucketingSink<String> (hdfsOutputPath);

HdfsSink.setBucketer(new BasePathBucketer<String>());
HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 'hdfsOutputBatchSizeInMB' MB
HdfsSink.setPartPrefix("PART-FILE-" + Long.toString(System.currentTimeMillis()));

 

 

Can someone please suggest me, what code changes I can try so that I get a new timestamp for every part file that is being rolled new?

 

 

Thanks a lot. 

 

Regards,

Raja.