Trying to write to parquet file (kafka as a source) yields thousands of "in progress" files

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Trying to write to parquet file (kafka as a source) yields thousands of "in progress" files

avilevi
Hi,
I am trying to read from kafka and write to parquet. But I am getting thousands of ".part-0-0in progress..." files (and counting ...) 
is that a bug or am I doing something wrong?

object StreamParquet extends App {
  implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(100)  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setParallelism(1)
val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new AddressSchema(), consumerProperties)
  val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
  val outputPath = "streaming_files"
  val sink = StreamingFileSink.forBulkFormat(
    new Path(outputPath),
    ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
  stream.addSink(sink)
  env.execute("Write to file")
}

Reply | Threaded
Open this post in threaded view
|

Re: Trying to write to parquet file (kafka as a source) yields thousands of "in progress" files

avilevi
Got it , my bad. I should have used backeteer. this seems to be working fine
StreamingFileSink.forBulkFormat[Request](
    new Path(outputPath),
    ParquetAvroWriters.forReflectRecord(classOf[Request]))
    .withBucketAssigner(DateTimeBucketAssigner[Request])
    .withBucketCheckInterval(5000L)
    .build()

On Sun, Dec 9, 2018 at 2:13 PM Avi Levi <[hidden email]> wrote:
Hi,
I am trying to read from kafka and write to parquet. But I am getting thousands of ".part-0-0in progress..." files (and counting ...) 
is that a bug or am I doing something wrong?

object StreamParquet extends App {
  implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(100)  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setParallelism(1)
val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new AddressSchema(), consumerProperties)
  val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
  val outputPath = "streaming_files"
  val sink = StreamingFileSink.forBulkFormat(
    new Path(outputPath),
    ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
  stream.addSink(sink)
  env.execute("Write to file")
}