[External] Flink StreamingFileSink not ingesting to S3 when checkpointing is disabled

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[External] Flink StreamingFileSink not ingesting to S3 when checkpointing is disabled

Vishal Sharma

Hello everyone,

I want to use aws s3 as sink for a data stream in flink. I am using StreamingFileSink class to create a sink.

I don't need checkpointing for my job, but when I disable checkpointing, data is no longer written to S3.

case 1 : checkpointing enabled
When checkpointing is enabled, the data is successfully ingested to the mentioned s3 path.

case 2 : checkpointing disabled
When checkpointing is disabled, the data is not written to s3. 

I tried executing the job multiple times, but every time I got the same result. I am facing this on local machine as well as on kubernetes cluster. 


Following is a code I tried having bounded stream - 

object FlinkTestJob {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // with checkpointing enabled
    env.enableCheckpointing(100)

    // Sinks
    val streamStrings: Seq[String] =
      Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10")

    val testStream = env.fromCollection(streamStrings)

    val rollingPolicy = new RollingPolicy[String, String] {

      override def shouldRollOnCheckpoint(partFileState: PartFileInfo[String]): Boolean =
        partFileState.getSize > 1

      override def shouldRollOnEvent(
          partFileState: PartFileInfo[String],
          element: String): Boolean = true

      override def shouldRollOnProcessingTime(
          partFileState: PartFileInfo[String],
          currentTime: Long): Boolean = true
    }

    val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(new Path("s3a://testbucket/sink"), new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(rollingPolicy)
      .build()

    testStream.addSink(sink)
    env.execute("test-job")
  }
}


When I write to s3 using "writeAsText("s3a://testbucket/sink")" instead of StreamingFileSink, it works perfectly fine regardless of whether or not checkpointing is enabled.

Flink version : 1.8.0
I want to understand the relation between checkpointing and StreamingFileSink.

- Thanks


Grab is hiring. Learn more at https://grab.careers

By communicating with Grab Inc and/or its subsidiaries, associate companies and jointly controlled entities (“Grab Group”), you are deemed to have consented to processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/

This email contains confidential information and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email and notify Grab Group immediately if you have received this by mistake and delete this email from your system. Email transmission cannot be guaranteed to be secure or error-free as any information therein could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do not accept liability for any errors or omissions in the contents of this email arises as a result of email transmission. All intellectual property rights in this email and attachments therein shall remain vested in Grab Group, unless otherwise provided by law.
Reply | Threaded
Open this post in threaded view
|

Re: [External] Flink StreamingFileSink not ingesting to S3 when checkpointing is disabled

Timothy Victor
You must have checkpointing enabled to use the StreamingFileSink.  The feature relies on CP for achieving exactly once semantics.

>>  This is integrated with the checkpointing mechanism to provide exactly once semantics.

 
Tim

On Mon, May 27, 2019 at 9:27 PM Vishal Sharma <[hidden email]> wrote:

Hello everyone,

I want to use aws s3 as sink for a data stream in flink. I am using StreamingFileSink class to create a sink.

I don't need checkpointing for my job, but when I disable checkpointing, data is no longer written to S3.

case 1 : checkpointing enabled
When checkpointing is enabled, the data is successfully ingested to the mentioned s3 path.

case 2 : checkpointing disabled
When checkpointing is disabled, the data is not written to s3. 

I tried executing the job multiple times, but every time I got the same result. I am facing this on local machine as well as on kubernetes cluster. 


Following is a code I tried having bounded stream - 

object FlinkTestJob {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // with checkpointing enabled
    env.enableCheckpointing(100)

    // Sinks
    val streamStrings: Seq[String] =
      Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10")

    val testStream = env.fromCollection(streamStrings)

    val rollingPolicy = new RollingPolicy[String, String] {

      override def shouldRollOnCheckpoint(partFileState: PartFileInfo[String]): Boolean =
        partFileState.getSize > 1

      override def shouldRollOnEvent(
          partFileState: PartFileInfo[String],
          element: String): Boolean = true

      override def shouldRollOnProcessingTime(
          partFileState: PartFileInfo[String],
          currentTime: Long): Boolean = true
    }

    val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(new Path("s3a://testbucket/sink"), new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(rollingPolicy)
      .build()

    testStream.addSink(sink)
    env.execute("test-job")
  }
}


When I write to s3 using "writeAsText("s3a://testbucket/sink")" instead of StreamingFileSink, it works perfectly fine regardless of whether or not checkpointing is enabled.

Flink version : 1.8.0
I want to understand the relation between checkpointing and StreamingFileSink.

- Thanks


Grab is hiring. Learn more at https://grab.careers

By communicating with Grab Inc and/or its subsidiaries, associate companies and jointly controlled entities (“Grab Group”), you are deemed to have consented to processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/

This email contains confidential information and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email and notify Grab Group immediately if you have received this by mistake and delete this email from your system. Email transmission cannot be guaranteed to be secure or error-free as any information therein could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do not accept liability for any errors or omissions in the contents of this email arises as a result of email transmission. All intellectual property rights in this email and attachments therein shall remain vested in Grab Group, unless otherwise provided by law.