I am trying to read from kafka and write to parquet. But I am getting thousands of ".part-0-0in progress..." files (and counting ...)
object StreamParquet extends App {
implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(100) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(600)
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setParallelism(1)
val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new AddressSchema(), consumerProperties)
val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
val outputPath = "streaming_files"
val sink = StreamingFileSink.forBulkFormat(
new Path(outputPath),
ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
stream.addSink(sink)
env.execute("Write to file")
}