Trying to write to parquet file (kafka as a source) yields thousands of "in progress" files

Posted by avilevi on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Trying-to-write-to-parquet-file-kafka-as-a-source-yields-thousands-of-in-progress-files-tp25019.html

Hi,
I am trying to read from kafka and write to parquet. But I am getting thousands of ".part-0-0in progress..." files (and counting ...) 
is that a bug or am I doing something wrong?

object StreamParquet extends App {
  implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(100)  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setParallelism(1)
val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new AddressSchema(), consumerProperties)
  val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
  val outputPath = "streaming_files"
  val sink = StreamingFileSink.forBulkFormat(
    new Path(outputPath),
    ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
  stream.addSink(sink)
  env.execute("Write to file")
}