fllink 1.7.1 and RollingFileSink

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

fllink 1.7.1 and RollingFileSink

Vishal Santoshi
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 


Reply | Threaded
Open this post in threaded view
|

Re: fllink 1.7.1 and RollingFileSink

Timothy Victor
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 


Reply | Threaded
Open this post in threaded view
|

Re: fllink 1.7.1 and RollingFileSink

Vishal Santoshi
Thanks for the quick reply. 

I am confused. If this was a more full featured BucketingSink ,I would imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be  finalized. For exactly once any files ( in progress file ) will have a length of the file  snapshotted to the checkpoint  and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported )  if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and  I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint.
  

Am I better off using BucketingSink ?  When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) 

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <[hidden email]> wrote:
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 


Reply | Threaded
Open this post in threaded view
|

Re: fllink 1.7.1 and RollingFileSink

Vishal Santoshi
That said the in the DefaultRollingPolicy it seems the check is on the file size ( mimics the check shouldRollOnEVent()). 

I guess the question is 

Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread ? 

Are the calls to the other 2 methods shouldRollOnEVent and shouldRollOnProcessingTIme done on the execution thread  as in inlined ? 





On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <[hidden email]> wrote:
Thanks for the quick reply. 

I am confused. If this was a more full featured BucketingSink ,I would imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be  finalized. For exactly once any files ( in progress file ) will have a length of the file  snapshotted to the checkpoint  and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported )  if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and  I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint.
  

Am I better off using BucketingSink ?  When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) 

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <[hidden email]> wrote:
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 


Reply | Threaded
Open this post in threaded view
|

Re: fllink 1.7.1 and RollingFileSink

Timothy Victor
My apologies for not seeing your use case properly.   The constraint on rolling policy is only applicable for bulk formats such as Parquet as highlighted in the docs.

As for your questions, I'll have to defer to others more familiar with it.   I mostly just use bulk formats such as avro and parquet.

Tim


On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <[hidden email] wrote:
That said the in the DefaultRollingPolicy it seems the check is on the file size ( mimics the check shouldRollOnEVent()). 

I guess the question is 

Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread ? 

Are the calls to the other 2 methods shouldRollOnEVent and shouldRollOnProcessingTIme done on the execution thread  as in inlined ? 





On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <[hidden email]> wrote:
Thanks for the quick reply. 

I am confused. If this was a more full featured BucketingSink ,I would imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be  finalized. For exactly once any files ( in progress file ) will have a length of the file  snapshotted to the checkpoint  and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported )  if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and  I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint.
  

Am I better off using BucketingSink ?  When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) 

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <[hidden email]> wrote:
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 


Reply | Threaded
Open this post in threaded view
|

Re: fllink 1.7.1 and RollingFileSink

Vishal Santoshi
You don't have to. Thank you for the input. 

On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <[hidden email]> wrote:
My apologies for not seeing your use case properly.   The constraint on rolling policy is only applicable for bulk formats such as Parquet as highlighted in the docs.

As for your questions, I'll have to defer to others more familiar with it.   I mostly just use bulk formats such as avro and parquet.

Tim


On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <[hidden email] wrote:
That said the in the DefaultRollingPolicy it seems the check is on the file size ( mimics the check shouldRollOnEVent()). 

I guess the question is 

Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread ? 

Are the calls to the other 2 methods shouldRollOnEVent and shouldRollOnProcessingTIme done on the execution thread  as in inlined ? 





On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <[hidden email]> wrote:
Thanks for the quick reply. 

I am confused. If this was a more full featured BucketingSink ,I would imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be  finalized. For exactly once any files ( in progress file ) will have a length of the file  snapshotted to the checkpoint  and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported )  if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and  I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint.
  

Am I better off using BucketingSink ?  When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) 

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <[hidden email]> wrote:
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 


Reply | Threaded
Open this post in threaded view
|

Re: fllink 1.7.1 and RollingFileSink

Vishal Santoshi
Any one ?

On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <[hidden email]> wrote:
You don't have to. Thank you for the input. 

On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <[hidden email]> wrote:
My apologies for not seeing your use case properly.   The constraint on rolling policy is only applicable for bulk formats such as Parquet as highlighted in the docs.

As for your questions, I'll have to defer to others more familiar with it.   I mostly just use bulk formats such as avro and parquet.

Tim


On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <[hidden email] wrote:
That said the in the DefaultRollingPolicy it seems the check is on the file size ( mimics the check shouldRollOnEVent()). 

I guess the question is 

Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread ? 

Are the calls to the other 2 methods shouldRollOnEVent and shouldRollOnProcessingTIme done on the execution thread  as in inlined ? 





On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <[hidden email]> wrote:
Thanks for the quick reply. 

I am confused. If this was a more full featured BucketingSink ,I would imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be  finalized. For exactly once any files ( in progress file ) will have a length of the file  snapshotted to the checkpoint  and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported )  if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and  I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint.
  

Am I better off using BucketingSink ?  When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) 

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <[hidden email]> wrote:
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 


Reply | Threaded
Open this post in threaded view
|

Re: fllink 1.7.1 and RollingFileSink

Fabian Hueske-2
Hi Vishal,

Kostas (in CC) should be able to help here.

Best, Fabian

Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <[hidden email]>:
Any one ?

On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <[hidden email]> wrote:
You don't have to. Thank you for the input. 

On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <[hidden email]> wrote:
My apologies for not seeing your use case properly.   The constraint on rolling policy is only applicable for bulk formats such as Parquet as highlighted in the docs.

As for your questions, I'll have to defer to others more familiar with it.   I mostly just use bulk formats such as avro and parquet.

Tim


On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <[hidden email] wrote:
That said the in the DefaultRollingPolicy it seems the check is on the file size ( mimics the check shouldRollOnEVent()). 

I guess the question is 

Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread ? 

Are the calls to the other 2 methods shouldRollOnEVent and shouldRollOnProcessingTIme done on the execution thread  as in inlined ? 





On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <[hidden email]> wrote:
Thanks for the quick reply. 

I am confused. If this was a more full featured BucketingSink ,I would imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be  finalized. For exactly once any files ( in progress file ) will have a length of the file  snapshotted to the checkpoint  and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported )  if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and  I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint.
  

Am I better off using BucketingSink ?  When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) 

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <[hidden email]> wrote:
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 


Reply | Threaded
Open this post in threaded view
|

Re: fllink 1.7.1 and RollingFileSink

Vishal Santoshi
Thanks Fabian,

 more questions

1. I had on k8s standlone job env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the default. The job failed on chkpoint and I would have imagined that under HA the job would restore from the last checkpoint but it did not ( The UI showed the job had restarted without a restore . The state was wiped out and the job was relaunched but with no state. 

2. I had the inprogress files from that failed instance and that is consistent with no restored state.

Thus there are few  questions 

1. In k8s and with stand alone job cluster, have we tested the scenerio of the container failing ( the pod remained in tact ) and restore ?  In this case the pod remained up and running but it was definitely a clean relaunch of the container the pod was executing. 


2. Did I have any configuration missing . given the below  ?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new FsStateBackend(
new org.apache.flink.core.fs.Path(
"........"));
env.setStateBackend(stateBackEnd);

3. What is the nature of RollingFileSink ?  How does it enable exactly once semantics ( or does it not . ) ? 

Any help will be appreciated. 

Regards.









On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

Kostas (in CC) should be able to help here.

Best, Fabian

Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <[hidden email]>:
Any one ?

On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <[hidden email]> wrote:
You don't have to. Thank you for the input. 

On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <[hidden email]> wrote:
My apologies for not seeing your use case properly.   The constraint on rolling policy is only applicable for bulk formats such as Parquet as highlighted in the docs.

As for your questions, I'll have to defer to others more familiar with it.   I mostly just use bulk formats such as avro and parquet.

Tim


On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <[hidden email] wrote:
That said the in the DefaultRollingPolicy it seems the check is on the file size ( mimics the check shouldRollOnEVent()). 

I guess the question is 

Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread ? 

Are the calls to the other 2 methods shouldRollOnEVent and shouldRollOnProcessingTIme done on the execution thread  as in inlined ? 





On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <[hidden email]> wrote:
Thanks for the quick reply. 

I am confused. If this was a more full featured BucketingSink ,I would imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be  finalized. For exactly once any files ( in progress file ) will have a length of the file  snapshotted to the checkpoint  and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported )  if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and  I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint.
  

Am I better off using BucketingSink ?  When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) 

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <[hidden email]> wrote:
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 


Reply | Threaded
Open this post in threaded view
|

Re: fllink 1.7.1 and RollingFileSink

Kostas Kloudas-4
Hi Vishal,

For the StreamingFileSink vs Rolling/BucketingSink:
 - you can use the StreamingFileSink instead of the Rolling/BucketingSink. You can see the StreamingFileSink as an evolution of the previous two.

In the StreamingFileSink the files in Pending state are not renamed, but they keep their "*in-progress*" name. This is the reason why you do not see .pending files anymore.

What Timothy said for bulk formats is correct. They only support "onCheckpoint" rolling policy.

Now for the second issue about deployment, I would recommend to open a new thread so that people can see from the title if they can help or not.
In addition, it is good to have the title indicating the content of the topic for the community. The mailing list is searchable by search engines, so if someone
has a similar question, the title will help to retrieve the relevant thread.

Cheers,
Kostas


On Thu, Feb 14, 2019 at 12:09 PM Vishal Santoshi <[hidden email]> wrote:
Thanks Fabian,

 more questions

1. I had on k8s standlone job env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the default. The job failed on chkpoint and I would have imagined that under HA the job would restore from the last checkpoint but it did not ( The UI showed the job had restarted without a restore . The state was wiped out and the job was relaunched but with no state. 

2. I had the inprogress files from that failed instance and that is consistent with no restored state.

Thus there are few  questions 

1. In k8s and with stand alone job cluster, have we tested the scenerio of the container failing ( the pod remained in tact ) and restore ?  In this case the pod remained up and running but it was definitely a clean relaunch of the container the pod was executing. 


2. Did I have any configuration missing . given the below  ?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new FsStateBackend(
new org.apache.flink.core.fs.Path(
"........"));
env.setStateBackend(stateBackEnd);

3. What is the nature of RollingFileSink ?  How does it enable exactly once semantics ( or does it not . ) ? 

Any help will be appreciated. 

Regards.









On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

Kostas (in CC) should be able to help here.

Best, Fabian

Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <[hidden email]>:
Any one ?

On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <[hidden email]> wrote:
You don't have to. Thank you for the input. 

On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <[hidden email]> wrote:
My apologies for not seeing your use case properly.   The constraint on rolling policy is only applicable for bulk formats such as Parquet as highlighted in the docs.

As for your questions, I'll have to defer to others more familiar with it.   I mostly just use bulk formats such as avro and parquet.

Tim


On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <[hidden email] wrote:
That said the in the DefaultRollingPolicy it seems the check is on the file size ( mimics the check shouldRollOnEVent()). 

I guess the question is 

Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread ? 

Are the calls to the other 2 methods shouldRollOnEVent and shouldRollOnProcessingTIme done on the execution thread  as in inlined ? 





On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <[hidden email]> wrote:
Thanks for the quick reply. 

I am confused. If this was a more full featured BucketingSink ,I would imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be  finalized. For exactly once any files ( in progress file ) will have a length of the file  snapshotted to the checkpoint  and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported )  if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and  I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint.
  

Am I better off using BucketingSink ?  When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) 

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <[hidden email]> wrote:
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 




--

Kostas Kloudas | Software Engineer



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: fllink 1.7.1 and RollingFileSink

Vishal Santoshi
Awesome, thanks!  Will open a new thread. But yes the inprogress file was helpful. 

On Thu, Feb 14, 2019, 7:50 AM Kostas Kloudas <[hidden email] wrote:
Hi Vishal,

For the StreamingFileSink vs Rolling/BucketingSink:
 - you can use the StreamingFileSink instead of the Rolling/BucketingSink. You can see the StreamingFileSink as an evolution of the previous two.

In the StreamingFileSink the files in Pending state are not renamed, but they keep their "*in-progress*" name. This is the reason why you do not see .pending files anymore.

What Timothy said for bulk formats is correct. They only support "onCheckpoint" rolling policy.

Now for the second issue about deployment, I would recommend to open a new thread so that people can see from the title if they can help or not.
In addition, it is good to have the title indicating the content of the topic for the community. The mailing list is searchable by search engines, so if someone
has a similar question, the title will help to retrieve the relevant thread.

Cheers,
Kostas


On Thu, Feb 14, 2019 at 12:09 PM Vishal Santoshi <[hidden email]> wrote:
Thanks Fabian,

 more questions

1. I had on k8s standlone job env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the default. The job failed on chkpoint and I would have imagined that under HA the job would restore from the last checkpoint but it did not ( The UI showed the job had restarted without a restore . The state was wiped out and the job was relaunched but with no state. 

2. I had the inprogress files from that failed instance and that is consistent with no restored state.

Thus there are few  questions 

1. In k8s and with stand alone job cluster, have we tested the scenerio of the container failing ( the pod remained in tact ) and restore ?  In this case the pod remained up and running but it was definitely a clean relaunch of the container the pod was executing. 


2. Did I have any configuration missing . given the below  ?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new FsStateBackend(
new org.apache.flink.core.fs.Path(
"........"));
env.setStateBackend(stateBackEnd);

3. What is the nature of RollingFileSink ?  How does it enable exactly once semantics ( or does it not . ) ? 

Any help will be appreciated. 

Regards.









On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske <[hidden email]> wrote:
Hi Vishal,

Kostas (in CC) should be able to help here.

Best, Fabian

Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <[hidden email]>:
Any one ?

On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <[hidden email]> wrote:
You don't have to. Thank you for the input. 

On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <[hidden email]> wrote:
My apologies for not seeing your use case properly.   The constraint on rolling policy is only applicable for bulk formats such as Parquet as highlighted in the docs.

As for your questions, I'll have to defer to others more familiar with it.   I mostly just use bulk formats such as avro and parquet.

Tim


On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <[hidden email] wrote:
That said the in the DefaultRollingPolicy it seems the check is on the file size ( mimics the check shouldRollOnEVent()). 

I guess the question is 

Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread ? 

Are the calls to the other 2 methods shouldRollOnEVent and shouldRollOnProcessingTIme done on the execution thread  as in inlined ? 





On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <[hidden email]> wrote:
Thanks for the quick reply. 

I am confused. If this was a more full featured BucketingSink ,I would imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be  finalized. For exactly once any files ( in progress file ) will have a length of the file  snapshotted to the checkpoint  and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported )  if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and  I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint.
  

Am I better off using BucketingSink ?  When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) 

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <[hidden email]> wrote:
I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <[hidden email] wrote:
Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.

This code for example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder<KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
.withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
return false;
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
KafkaRecord element) throws IOException {
return partFileState.getSize() > 1024 * 1024 * 1024l;
}

@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
}
}
)
.build();

few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink

1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then 
   finalized on checkpoint for exactly once semantics ? 

2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have the following set up on the env 
env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);

Regards.

 




--

Kostas Kloudas | Software Engineer



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen