ClassLoader leak when using s3a upload through DataSet.output

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

ClassLoader leak when using s3a upload through DataSet.output

Vishal Santoshi
Hello folks,
                 We see threads from https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49  outlive a batch job that writes Parquet Files to S3, causing  a ClassLoader Leak. Is this a known issue ?  Logically a close on the TransferManager should close the ExecutorService ( and thus the threads ), 

The code is fairly straightforward, 

    val job = new Job()
    val hadoopOutFormat = new HadoopOutputFormat[Void, GenericRecord](
      new AvroParquetOutputFormat(),
      job
    )
    AvroParquetOutputFormat.setSchema(job, schema)
    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
    ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY)
    ParquetOutputFormat.setEnableDictionary(job, true)  // do we need this?
     and then an output      

This is  using  


scalaVersion := "2.12.12"
flinkVersion = "1.11.2"
hadoopVersion = "2.8.3"


Regards 

Vishal 

Reply | Threaded
Open this post in threaded view
|

Re: ClassLoader leak when using s3a upload through DataSet.output

Vishal Santoshi
We do put the flink-hdoop-uber*.jar in the flink lib ( and thus available to the root classloader  ). That still does not explain the executor service outliving the job. 

On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi <[hidden email]> wrote:
Hello folks,
                 We see threads from https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49  outlive a batch job that writes Parquet Files to S3, causing  a ClassLoader Leak. Is this a known issue ?  Logically a close on the TransferManager should close the ExecutorService ( and thus the threads ), 

The code is fairly straightforward, 

    val job = new Job()
    val hadoopOutFormat = new HadoopOutputFormat[Void, GenericRecord](
      new AvroParquetOutputFormat(),
      job
    )
    AvroParquetOutputFormat.setSchema(job, schema)
    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
    ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY)
    ParquetOutputFormat.setEnableDictionary(job, true)  // do we need this?
     and then an output      

This is  using  


scalaVersion := "2.12.12"
flinkVersion = "1.11.2"
hadoopVersion = "2.8.3"


Regards 

Vishal 

Reply | Threaded
Open this post in threaded view
|

Re: ClassLoader leak when using s3a upload through DataSet.output

Vishal Santoshi
com/amazonaws/services/s3/transfer/TransferManager.class  is in  flink-s3-fs-hadoop-1.11.2.jar which is in the plugins and that AFAIK should have a dedicated ClassLoader per plugin. So does it make sense that these classes remain beyond the job and so does the executor service for multipart upload ? 

I guess the question are 

* Why are classes not being GCed and it seems  that  these threads reference objects loaded by the JobCLassloader and thus ob ClasslOader is not being GCed. 
* Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as has been advised ... Can they be part of the uber jar ? 



On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi <[hidden email]> wrote:
We do put the flink-hdoop-uber*.jar in the flink lib ( and thus available to the root classloader  ). That still does not explain the executor service outliving the job. 

On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi <[hidden email]> wrote:
Hello folks,
                 We see threads from https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49  outlive a batch job that writes Parquet Files to S3, causing  a ClassLoader Leak. Is this a known issue ?  Logically a close on the TransferManager should close the ExecutorService ( and thus the threads ), 

The code is fairly straightforward, 

    val job = new Job()
    val hadoopOutFormat = new HadoopOutputFormat[Void, GenericRecord](
      new AvroParquetOutputFormat(),
      job
    )
    AvroParquetOutputFormat.setSchema(job, schema)
    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
    ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY)
    ParquetOutputFormat.setEnableDictionary(job, true)  // do we need this?
     and then an output      

This is  using  


scalaVersion := "2.12.12"
flinkVersion = "1.11.2"
hadoopVersion = "2.8.3"


Regards 

Vishal 

Reply | Threaded
Open this post in threaded view
|

Re: ClassLoader leak when using s3a upload through DataSet.output

Chesnay Schepler
FileSystems must not be bundled in the user jar.

You must place them in lib/ or plugins/, because by bundling it you break our assumption that they exist for the lifetime of the cluster (which in turn means we don't really have to worry about cleaning up).

On 2/10/2021 4:01 PM, Vishal Santoshi wrote:
com/amazonaws/services/s3/transfer/TransferManager.class  is in  flink-s3-fs-hadoop-1.11.2.jar which is in the plugins and that AFAIK should have a dedicated ClassLoader per plugin. So does it make sense that these classes remain beyond the job and so does the executor service for multipart upload ? 

I guess the question are 

* Why are classes not being GCed and it seems  that  these threads reference objects loaded by the JobCLassloader and thus ob ClasslOader is not being GCed. 
* Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as has been advised ... Can they be part of the uber jar ? 



On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi <[hidden email]> wrote:
We do put the flink-hdoop-uber*.jar in the flink lib ( and thus available to the root classloader  ). That still does not explain the executor service outliving the job. 

On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi <[hidden email]> wrote:
Hello folks,
                 We see threads from https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49  outlive a batch job that writes Parquet Files to S3, causing  a ClassLoader Leak. Is this a known issue ?  Logically a close on the TransferManager should close the ExecutorService ( and thus the threads ), 

The code is fairly straightforward, 

    val job = new Job()
    val hadoopOutFormat = new HadoopOutputFormat[Void, GenericRecord](
      new AvroParquetOutputFormat(),
      job
    )
    AvroParquetOutputFormat.setSchema(job, schema)
    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
    ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY)
    ParquetOutputFormat.setEnableDictionary(job, true)  // do we need this?
     and then an output      

This is  using  


scalaVersion := "2.12.12"
flinkVersion = "1.11.2"
hadoopVersion = "2.8.3"


Regards 

Vishal 


Reply | Threaded
Open this post in threaded view
|

Re: ClassLoader leak when using s3a upload through DataSet.output

Vishal Santoshi

On Wed, Feb 10, 2021 at 10:47 AM Vishal Santoshi <[hidden email]> wrote:
Thank you,
This pretty much it.. so as you can see 
 "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % "provided",
So the FS is in the lib 
and flink-s3-fs-hadoop-1.11.2.jar is in plugins 
Is there something you see weird with the below

val flinkDependencies = Seq(
  // Flink-runtime?
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" % "flink-json" % flinkVersion % "provided",
  "org.apache.flink" % "flink-csv" % flinkVersion % "provided",

  "org.apache.flink" % "flink-table-common" % flinkVersion,
  "org.apache.flink" %% "flink-table-api-scala" % flinkVersion,
  "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion,
  "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion,

  // For parquet output
  "org.apache.flink" %% "flink-parquet" % flinkVersion,
  "org.apache.flink" % "flink-avro" % flinkVersion,
  "org.apache.parquet" % "parquet-avro" % "1.11.0",  // manually added?
  "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion,
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,

  // Add to /lib. provided so this doesn't bring in hadoop deps, which are in the shaded jar instead
  // tried to put in $FLINK_HOME/lib but that didn't work?
  // Probably some other hadoop libs need to be treated the same way
  "com.amazonaws" % "aws-java-sdk" % "1.11.271",  // use just S3?
  // Should be provided in prod?
  "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % "provided",

   // required for flink's shaded hadoop?
  "org.slf4j" % "slf4j-api" % "1.7.15",
  "org.slf4j" % "slf4j-log4j12" % "1.7.15" % "provided",
  "log4j" % "log4j" % "1.2.17" % "provided",

  // CLI
  "com.github.scopt" %% "scopt" % "3.7.1",

  // JSON validation
  "com.github.java-json-tools" % "json-schema-validator" % "2.2.14",

  // JSON parsing
  "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.1",
  "org.json4s" %% "json4s-jackson" % "3.6.9",

  // Non-terrible Json parsing
  "io.circe" %% "circe-core" % circeVersion,
  "io.circe" %% "circe-generic" % circeVersion,
  "io.circe" %% "circe-generic-extras" % circeVersion,
  "io.circe" %% "circe-parser" % circeVersion,

  // Testing
  "org.scalatest" %% "scalatest" % "3.0.4" % Test,
  "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test
)



On Wed, Feb 10, 2021 at 10:14 AM Chesnay Schepler <[hidden email]> wrote:
FileSystems must not be bundled in the user jar.

You must place them in lib/ or plugins/, because by bundling it you break our assumption that they exist for the lifetime of the cluster (which in turn means we don't really have to worry about cleaning up).

On 2/10/2021 4:01 PM, Vishal Santoshi wrote:
com/amazonaws/services/s3/transfer/TransferManager.class  is in  flink-s3-fs-hadoop-1.11.2.jar which is in the plugins and that AFAIK should have a dedicated ClassLoader per plugin. So does it make sense that these classes remain beyond the job and so does the executor service for multipart upload ? 

I guess the question are 

* Why are classes not being GCed and it seems  that  these threads reference objects loaded by the JobCLassloader and thus ob ClasslOader is not being GCed. 
* Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as has been advised ... Can they be part of the uber jar ? 



On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi <[hidden email]> wrote:
We do put the flink-hdoop-uber*.jar in the flink lib ( and thus available to the root classloader  ). That still does not explain the executor service outliving the job. 

On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi <[hidden email]> wrote:
Hello folks,
                 We see threads from https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49  outlive a batch job that writes Parquet Files to S3, causing  a ClassLoader Leak. Is this a known issue ?  Logically a close on the TransferManager should close the ExecutorService ( and thus the threads ), 

The code is fairly straightforward, 

    val job = new Job()
    val hadoopOutFormat = new HadoopOutputFormat[Void, GenericRecord](
      new AvroParquetOutputFormat(),
      job
    )
    AvroParquetOutputFormat.setSchema(job, schema)
    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
    ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY)
    ParquetOutputFormat.setEnableDictionary(job, true)  // do we need this?
     and then an output      

This is  using  


scalaVersion := "2.12.12"
flinkVersion = "1.11.2"
hadoopVersion = "2.8.3"


Regards 

Vishal 


Reply | Threaded
Open this post in threaded view
|

Re: ClassLoader leak when using s3a upload through DataSet.output

Arvid Heise-4
Hi Vishal,

if you have the possibility could you create a memdump? It would be interesting to know why the TransferManager is never released.

Note that it's impossible to release all objects/classes loaded through a particular ClassLoader, all we can do is making sure that the ClassLoader is not used anymore, leading to a full release if all object instances are released. However, this doesn't seem to work in your case.

On Wed, Feb 10, 2021 at 6:43 PM Vishal Santoshi <[hidden email]> wrote:

On Wed, Feb 10, 2021 at 10:47 AM Vishal Santoshi <[hidden email]> wrote:
Thank you,
This pretty much it.. so as you can see 
 "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % "provided",
So the FS is in the lib 
and flink-s3-fs-hadoop-1.11.2.jar is in plugins 
Is there something you see weird with the below

val flinkDependencies = Seq(
  // Flink-runtime?
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" % "flink-json" % flinkVersion % "provided",
  "org.apache.flink" % "flink-csv" % flinkVersion % "provided",

  "org.apache.flink" % "flink-table-common" % flinkVersion,
  "org.apache.flink" %% "flink-table-api-scala" % flinkVersion,
  "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion,
  "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion,

  // For parquet output
  "org.apache.flink" %% "flink-parquet" % flinkVersion,
  "org.apache.flink" % "flink-avro" % flinkVersion,
  "org.apache.parquet" % "parquet-avro" % "1.11.0",  // manually added?
  "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion,
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,

  // Add to /lib. provided so this doesn't bring in hadoop deps, which are in the shaded jar instead
  // tried to put in $FLINK_HOME/lib but that didn't work?
  // Probably some other hadoop libs need to be treated the same way
  "com.amazonaws" % "aws-java-sdk" % "1.11.271",  // use just S3?
  // Should be provided in prod?
  "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % "provided",

   // required for flink's shaded hadoop?
  "org.slf4j" % "slf4j-api" % "1.7.15",
  "org.slf4j" % "slf4j-log4j12" % "1.7.15" % "provided",
  "log4j" % "log4j" % "1.2.17" % "provided",

  // CLI
  "com.github.scopt" %% "scopt" % "3.7.1",

  // JSON validation
  "com.github.java-json-tools" % "json-schema-validator" % "2.2.14",

  // JSON parsing
  "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.1",
  "org.json4s" %% "json4s-jackson" % "3.6.9",

  // Non-terrible Json parsing
  "io.circe" %% "circe-core" % circeVersion,
  "io.circe" %% "circe-generic" % circeVersion,
  "io.circe" %% "circe-generic-extras" % circeVersion,
  "io.circe" %% "circe-parser" % circeVersion,

  // Testing
  "org.scalatest" %% "scalatest" % "3.0.4" % Test,
  "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test
)



On Wed, Feb 10, 2021 at 10:14 AM Chesnay Schepler <[hidden email]> wrote:
FileSystems must not be bundled in the user jar.

You must place them in lib/ or plugins/, because by bundling it you break our assumption that they exist for the lifetime of the cluster (which in turn means we don't really have to worry about cleaning up).

On 2/10/2021 4:01 PM, Vishal Santoshi wrote:
com/amazonaws/services/s3/transfer/TransferManager.class  is in  flink-s3-fs-hadoop-1.11.2.jar which is in the plugins and that AFAIK should have a dedicated ClassLoader per plugin. So does it make sense that these classes remain beyond the job and so does the executor service for multipart upload ? 

I guess the question are 

* Why are classes not being GCed and it seems  that  these threads reference objects loaded by the JobCLassloader and thus ob ClasslOader is not being GCed. 
* Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as has been advised ... Can they be part of the uber jar ? 



On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi <[hidden email]> wrote:
We do put the flink-hdoop-uber*.jar in the flink lib ( and thus available to the root classloader  ). That still does not explain the executor service outliving the job. 

On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi <[hidden email]> wrote:
Hello folks,
                 We see threads from https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49  outlive a batch job that writes Parquet Files to S3, causing  a ClassLoader Leak. Is this a known issue ?  Logically a close on the TransferManager should close the ExecutorService ( and thus the threads ), 

The code is fairly straightforward, 

    val job = new Job()
    val hadoopOutFormat = new HadoopOutputFormat[Void, GenericRecord](
      new AvroParquetOutputFormat(),
      job
    )
    AvroParquetOutputFormat.setSchema(job, schema)
    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
    ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY)
    ParquetOutputFormat.setEnableDictionary(job, true)  // do we need this?
     and then an output      

This is  using  


scalaVersion := "2.12.12"
flinkVersion = "1.11.2"
hadoopVersion = "2.8.3"


Regards 

Vishal