I'm trying to generate some test data using a collection, and write that data to s3, Flink doesn't seem to do any checkpointing at all when I do this, but it does do checkpointing when the source comes from s3.
For example, this DOES checkpoint and leaves output files in a completed state:
```scala
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))
val lines: DataStream[String] = {
val path = "s3a://my_bucket/simple_job/in"
env
.readFile(
inputFormat = new TextInputFormat(new Path(path)),
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = 5000L
)
}
val sinkFunction: BucketingSink[String] =
new BucketingSink[String]("s3a://my_bucket/simple_job/out")
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
lines.addSink(sinkFunction)
env.execute()
```Meanwhile, this DOES NOT checkpoint, and leaves files in a .pending state even after the job has finished:
```scala
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))
val lines: DataStream[String] = env.fromCollection((1 to 100).map(_.toString))
val sinkFunction: BucketingSink[String] =
new BucketingSink[String]("s3a://my_bucket/simple_job/out")
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
lines.addSink(sinkFunction)
env.execute()
```