Dynamic file name prefix - StreamingFileSink

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamic file name prefix - StreamingFileSink

Vijayendra Yadav
Hi Team,

I have tried to assign a dynamic prefix for file name, which contains datetime components. 
The Problem is Job always takes initial datetime when job first starts and never refreshes later. 
How can I get dynamic current datetime in filename at sink time ?

.withPartPrefix (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))


val config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
            
val sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build()
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic file name prefix - StreamingFileSink

Ravi Bhushan Ratnakar
Hi Vijayendra,

OutputFileConfig provides a builder method to create immutable objects with given 'prefix' and 'suffix'. The parameter which you are passing to 'withPartPrefix' will only be evaluated at the time of calling this method 'withPartPrefix'. So if you want to achieve a dynamic 'prefix' or 'suffix' then you may try to have your own custom implementation of 'OutputFileConfig' which could provide a way to set function definition for 'prefix' or 'suffix'. For the same, I am attaching you a sample implementation. Kindly make sure that the function definition which you are passing is serializable.

Use like this

val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
.withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))
.withPartSuffixFunction(()=> ".ext")

Regards,
Ravi

On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

I have tried to assign a dynamic prefix for file name, which contains datetime components. 
The Problem is Job always takes initial datetime when job first starts and never refreshes later. 
How can I get dynamic current datetime in filename at sink time ?

.withPartPrefix (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))


val config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
            
val sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build()

DynamicOutputFileConfig.scala (1K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic file name prefix - StreamingFileSink

Vijayendra Yadav
Thanks Ravi. I got following Error:
[ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix overrides nothing
[ERROR]   override def getPartPrefix: String = if(partPrefixFunction == null) partPrefix else partPrefixFunction.apply()
[ERROR]                ^
[ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix overrides nothing
[ERROR]   override def getPartSuffix: String = if(partSuffixFunction == null) partSuffix else partSuffixFunction.apply()



On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vijayendra,

OutputFileConfig provides a builder method to create immutable objects with given 'prefix' and 'suffix'. The parameter which you are passing to 'withPartPrefix' will only be evaluated at the time of calling this method 'withPartPrefix'. So if you want to achieve a dynamic 'prefix' or 'suffix' then you may try to have your own custom implementation of 'OutputFileConfig' which could provide a way to set function definition for 'prefix' or 'suffix'. For the same, I am attaching you a sample implementation. Kindly make sure that the function definition which you are passing is serializable.

Use like this

val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
.withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))
.withPartSuffixFunction(()=> ".ext")

Regards,
Ravi

On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

I have tried to assign a dynamic prefix for file name, which contains datetime components. 
The Problem is Job always takes initial datetime when job first starts and never refreshes later. 
How can I get dynamic current datetime in filename at sink time ?

.withPartPrefix (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))


val config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
            
val sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build()
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic file name prefix - StreamingFileSink

Piotr Nowojski-4
Hi Yadav,

What Flink version are you using? `getPartPrefix` and `getPartSufix` methods were not public before 1.10.1/1.11.0, which might be causing this problem for you. Other than that, if you are already using Flink 1.10.1 (or newer), maybe please double check what class are you extending? The example provided by Ravi seems to be working for me.

Piotrek

wt., 13 paź 2020 o 19:02 Vijayendra Yadav <[hidden email]> napisał(a):
Thanks Ravi. I got following Error:
[ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix overrides nothing
[ERROR]   override def getPartPrefix: String = if(partPrefixFunction == null) partPrefix else partPrefixFunction.apply()
[ERROR]                ^
[ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix overrides nothing
[ERROR]   override def getPartSuffix: String = if(partSuffixFunction == null) partSuffix else partSuffixFunction.apply()



On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vijayendra,

OutputFileConfig provides a builder method to create immutable objects with given 'prefix' and 'suffix'. The parameter which you are passing to 'withPartPrefix' will only be evaluated at the time of calling this method 'withPartPrefix'. So if you want to achieve a dynamic 'prefix' or 'suffix' then you may try to have your own custom implementation of 'OutputFileConfig' which could provide a way to set function definition for 'prefix' or 'suffix'. For the same, I am attaching you a sample implementation. Kindly make sure that the function definition which you are passing is serializable.

Use like this

val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
.withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))
.withPartSuffixFunction(()=> ".ext")

Regards,
Ravi

On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

I have tried to assign a dynamic prefix for file name, which contains datetime components. 
The Problem is Job always takes initial datetime when job first starts and never refreshes later. 
How can I get dynamic current datetime in filename at sink time ?

.withPartPrefix (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))


val config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
            
val sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build()
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic file name prefix - StreamingFileSink

Vijayendra Yadav
Hi Piotrek,

That is correct I was still in 1.10, I am upgrading to 1.11.

Regards,
Vijay

On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski <[hidden email]> wrote:
Hi Yadav,

What Flink version are you using? `getPartPrefix` and `getPartSufix` methods were not public before 1.10.1/1.11.0, which might be causing this problem for you. Other than that, if you are already using Flink 1.10.1 (or newer), maybe please double check what class are you extending? The example provided by Ravi seems to be working for me.

Piotrek

wt., 13 paź 2020 o 19:02 Vijayendra Yadav <[hidden email]> napisał(a):
Thanks Ravi. I got following Error:
[ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix overrides nothing
[ERROR]   override def getPartPrefix: String = if(partPrefixFunction == null) partPrefix else partPrefixFunction.apply()
[ERROR]                ^
[ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix overrides nothing
[ERROR]   override def getPartSuffix: String = if(partSuffixFunction == null) partSuffix else partSuffixFunction.apply()



On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vijayendra,

OutputFileConfig provides a builder method to create immutable objects with given 'prefix' and 'suffix'. The parameter which you are passing to 'withPartPrefix' will only be evaluated at the time of calling this method 'withPartPrefix'. So if you want to achieve a dynamic 'prefix' or 'suffix' then you may try to have your own custom implementation of 'OutputFileConfig' which could provide a way to set function definition for 'prefix' or 'suffix'. For the same, I am attaching you a sample implementation. Kindly make sure that the function definition which you are passing is serializable.

Use like this

val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
.withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))
.withPartSuffixFunction(()=> ".ext")

Regards,
Ravi

On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

I have tried to assign a dynamic prefix for file name, which contains datetime components. 
The Problem is Job always takes initial datetime when job first starts and never refreshes later. 
How can I get dynamic current datetime in filename at sink time ?

.withPartPrefix (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))


val config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
            
val sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build()
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic file name prefix - StreamingFileSink

Piotr Nowojski-4
Great! Please let us know if it solves the issue or not.

Best,
Piotrek

śr., 14 paź 2020 o 17:46 Vijayendra Yadav <[hidden email]> napisał(a):
Hi Piotrek,

That is correct I was still in 1.10, I am upgrading to 1.11.

Regards,
Vijay

On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski <[hidden email]> wrote:
Hi Yadav,

What Flink version are you using? `getPartPrefix` and `getPartSufix` methods were not public before 1.10.1/1.11.0, which might be causing this problem for you. Other than that, if you are already using Flink 1.10.1 (or newer), maybe please double check what class are you extending? The example provided by Ravi seems to be working for me.

Piotrek

wt., 13 paź 2020 o 19:02 Vijayendra Yadav <[hidden email]> napisał(a):
Thanks Ravi. I got following Error:
[ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix overrides nothing
[ERROR]   override def getPartPrefix: String = if(partPrefixFunction == null) partPrefix else partPrefixFunction.apply()
[ERROR]                ^
[ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix overrides nothing
[ERROR]   override def getPartSuffix: String = if(partSuffixFunction == null) partSuffix else partSuffixFunction.apply()



On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vijayendra,

OutputFileConfig provides a builder method to create immutable objects with given 'prefix' and 'suffix'. The parameter which you are passing to 'withPartPrefix' will only be evaluated at the time of calling this method 'withPartPrefix'. So if you want to achieve a dynamic 'prefix' or 'suffix' then you may try to have your own custom implementation of 'OutputFileConfig' which could provide a way to set function definition for 'prefix' or 'suffix'. For the same, I am attaching you a sample implementation. Kindly make sure that the function definition which you are passing is serializable.

Use like this

val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
.withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))
.withPartSuffixFunction(()=> ".ext")

Regards,
Ravi

On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

I have tried to assign a dynamic prefix for file name, which contains datetime components. 
The Problem is Job always takes initial datetime when job first starts and never refreshes later. 
How can I get dynamic current datetime in filename at sink time ?

.withPartPrefix (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))


val config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
            
val sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build()