Bucketing Sink does not complete files, when source is from a collection

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Bucketing Sink does not complete files, when source is from a collection

joshlemer
Hello, I was wondering if I could get some pointers on what I'm doing wrong here. I posted this on stack overflow , but I thought I'd also ask here. 

I'm trying to generate some test data using a collection, and write that data to s3, Flink doesn't seem to do any checkpointing at all when I do this, but it does do checkpointing when the source comes from s3.

For example, this DOES checkpoint and leaves output files in a completed state:

```scala

  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.setMaxParallelism(128)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  env.enableCheckpointing(2000L)

  env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))


  val lines: DataStream[String] = {

    val path = "s3a://my_bucket/simple_job/in"

    env

      .readFile(

        inputFormat = new TextInputFormat(new Path(path)),

        filePath = path,

        watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,

        interval = 5000L

      )

  }


  val sinkFunction: BucketingSink[String] =

    new BucketingSink[String]("s3a://my_bucket/simple_job/out")

      .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))


  lines.addSink(sinkFunction)


  env.execute()

```

Meanwhile, this DOES NOT checkpoint, and leaves files in a .pending state even after the job has finished:


```scala

  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.setMaxParallelism(128)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  env.enableCheckpointing(2000L)

  env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))


  val lines: DataStream[String] = env.fromCollection((1 to 100).map(_.toString))


  val sinkFunction: BucketingSink[String] =

    new BucketingSink[String]("s3a://my_bucket/simple_job/out")

      .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))


  lines.addSink(sinkFunction)


  env.execute()
```

Is this a bug in flink or something I'm doing wrong? Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Bucketing Sink does not complete files, when source is from a collection

joshlemer
Actually sorry, I have found that this is most likely a manifestation of
https://issues.apache.org/jira/browse/FLINK-2646 as discussed elsewhere on
the mailing list. That is, in the second example "fromCollection" the entire
stream ends before a checkpoint is made. Let's hope this is fixed some day
:-)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Bucketing Sink does not complete files, when source is from a collection

Fabian Hueske-2
Hi Josh,

You are right, FLINK-2646 is related to the problem of non-finialized files. If we could distinguish the cases why close() is called, we could do a proper clean-up if the job terminated because all data was processed.

Right now, the source and sink interfaces of the DataStream API are not really designed for finite / bounded data.
In order to improve the support for bounded and unbounded data, we have some plans to design unified interfaces that can handle both cases well.
This effort should also solve cases like the one that you described.

Best, Fabian

2018-04-04 21:51 GMT+02:00 joshlemer <[hidden email]>:
Actually sorry, I have found that this is most likely a manifestation of
https://issues.apache.org/jira/browse/FLINK-2646 as discussed elsewhere on
the mailing list. That is, in the second example "fromCollection" the entire
stream ends before a checkpoint is made. Let's hope this is fixed some day
:-)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/