Flink 1.10 - Hadoop libraries integration with plugins and class loading

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.10 - Hadoop libraries integration with plugins and class loading

Ricardo Cardante
Hi!

We're working on a project where data is being written to S3 within a Flink application.
Running the integration tests locally / IntelliJ (using MiniClusterWithClientResource) all the dependencies are correctly resolved and the program executes as expected. However, when fat JAR is submitted to a Flink setup running on docker, we're getting the following exception:

---------------------------------
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path
---------------------------------

Which refers to the usage of that class in a RichSinkFunction while building an AvroParquetWriter

---------------------------------
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetFileWriter
// ...
Try {
val writer = AvroParquetWriter
.builder[GenericRecord](new Path(finalFilePath))
.withSchema(new Schema.Parser().parse(schema))
.withDataModel(GenericData.get)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()

elements.foreach(element => writer.write(element))
writer.close()
}
// ...
---------------------------------

Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being dynamically loaded upon task/job manager(s) startup (also, we are keeping Flink's default inverted class loading strategy), shouldn't Hadoop dependencies be loaded by parent-first? (based on classloader.parent-first-patterns.default)

We also tried to put "flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" on Flink's /lib folder, but when doing that we got this error instead:

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

The only way we are being able to make the application work as expected is to include the dependency "hadoop-aws" with compile scope, but we get a fat JAR and transitive dependencies on Hadoop libraries that we would like to avoid.

What would be the most appropriate way to take advantage of cluster's "flink-s3-fs-hadoop" and avoid to deliver any Hadoop related library on our application JAR?

The dependencies we're using in the build.sbt file:
---------------------------------
lazy val dependencies =
new {
val flinkVersion = "1.10.0"
val parquetAvroVersion = "1.10.1"
val hadoopVersion = "3.2.1"
val circeVersion = "0.12.3"
val rogachVersion = "3.3.1"
val loggingVersion = "3.7.2"
val scalatestVersion = "3.0.5"
val mockitoVersion = "1.10.0"
val kafkaVersion = "2.2.0"
val scalajVersion = "2.4.2"
val snakeYamlVersion = "1.25"
val slf4jVersion = "1.7.30"
val beanUtilsVersion = "1.9.4"
val shadedHadoopVersion = "2.8.3-10.0"

// Core libraries provided at runtime
val flink = "org.apache.flink" %% "flink-scala" % flinkVersion % "provided"
val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided"
val flinks3Hadoop = "org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion % "provided"

// Application specific dependencies.
val flinkConnectorKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion
val flinkStateBackendRocksDb = "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion
val flinkParquet = "org.apache.flink" %% "flink-parquet" % flinkVersion
val flinkDropwizard = "org.apache.flink" % "flink-metrics-dropwizard" % flinkVersion
val parquetAvro = "org.apache.parquet" % "parquet-avro" % parquetAvroVersion
val circeCore = "io.circe" %% "circe-core" % circeVersion
val circeParser = "io.circe" %% "circe-parser" % circeVersion
val circeGeneric = "io.circe" %% "circe-generic" % circeVersion
val scallop = "org.rogach" %% "scallop" % rogachVersion
val logging = "com.typesafe.scala-logging" %% "scala-logging" % loggingVersion
val snakeYaml = "org.yaml" % "snakeyaml" % snakeYamlVersion
val slf4j = "org.slf4j" % "slf4j-log4j12" % slf4jVersion
val beanUtils = "commons-beanutils" % "commons-beanutils" % beanUtilsVersion

// Test libraries
val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion % "test"
val mockito = "org.mockito" %% "mockito-scala" % mockitoVersion % "test"
val flinkTestUtils = "org.apache.flink" %% "flink-test-utils" % flinkVersion % "test"
val kafkaStreams = "org.apache.kafka" % "kafka-streams" % kafkaVersion % "test"
val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion % "test"
val kafka = "org.apache.kafka" %% "kafka" % kafkaVersion % "test"
val hadoopClient = "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "test"

// Test classifiers only
val flinkRuntimeTest = "org.apache.flink" %% "flink-runtime" % flinkVersion % "test" classifier "tests"
val kafkaTest = "org.apache.kafka" %% "kafka" % kafkaVersion % "test" classifier "test"
val kafkaStreamsTest = "org.apache.kafka" % "kafka-streams" % kafkaVersion % "test" classifier "test"
val kafkaClientsTest = "org.apache.kafka" % "kafka-clients" % kafkaVersion % "test" classifier "test"
}
---------------------------------



This is the Dockerfile:
---------------------------------
FROM flink:1.10.0-scala_2.12
RUN cp /opt/flink/opt/flink-metrics-prometheus-1.10.0.jar /opt/flink/lib
RUN mkdir /opt/flink/plugins/flink-s3-fs-presto /opt/flink/plugins/flink-s3-fs-hadoop
RUN cp /opt/flink/opt/flink-s3-fs-presto-1.10.0.jar /opt/flink/plugins/flink-s3-fs-presto/
RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.10.0.jar /opt/flink/plugins/flink-s3-fs-hadoop/
RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-presto/
RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop/
---------------------------------

--
Best regards,
Ricardo Cardante.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 - Hadoop libraries integration with plugins and class loading

Piotr Nowojski-3
Hi,

Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being dynamically loaded upon task/job manager(s) startup (also, we are keeping Flink's default inverted class loading strategy), shouldn't Hadoop dependencies be loaded by parent-first? (based on classloader.parent-first-patterns.default)

I think you are misunderstanding plugins. The fact that you have added s3 FileSystem plugin, doesn’t mean that your code can access it’s dependencies. The whole point of plugins class loading is to completely isolate plugins between one another, and to isolate them from any user code. Plugin classes are not loaded to the parent class loader, but to a separate class loader that’s independent from the FlinkUserClassLoader (containing user’s jars).

---------------------------------
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetFileWriter
// ...
Try {
val writer = AvroParquetWriter
.builder[GenericRecord](new Path(finalFilePath))
.withSchema(new Schema.Parser().parse(schema))
.withDataModel(GenericData.get)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()

elements.foreach(element => writer.write(element))
writer.close()
}
// ...
---------------------------------

Also in the first place, you probably shouldn’t be using AvroParquetWriter directly, but use StreamingFileSink [1] to write Parquet files. Example can be found here [2]. 

If you are using `org.apache.parquet.avro.AvroParquetWriter` directly, you will not have any checkpointing support (potential data loss or data duplication issues). Even I’m not sure if your code can be executed in parallel (aren’t you trying to share one instance of org.apache.parquet.avro.AvroParquetWriter among multiple operators?). 

But let’s say that you have to use AvroParquetWriter directly for some reason. In that case you would have to add all of the required dependencies to your job’s fat jar (or usrlib directory?), and you should be using TwoPhaseCommitSinkFunction as a base class for your writer [3]. Implementing properly an exactly-once sink is not that trivial - unless you know what you are doing.

Piotrek


On 26 Feb 2020, at 18:52, Ricardo Cardante <[hidden email]> wrote:

Hi!

We're working on a project where data is being written to S3 within a Flink application.
Running the integration tests locally / IntelliJ (using MiniClusterWithClientResource) all the dependencies are correctly resolved and the program executes as expected. However, when fat JAR is submitted to a Flink setup running on docker, we're getting the following exception:

---------------------------------
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path
---------------------------------

Which refers to the usage of that class in a RichSinkFunction while building an AvroParquetWriter

---------------------------------
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetFileWriter
// ...
Try {
val writer = AvroParquetWriter
.builder[GenericRecord](new Path(finalFilePath))
.withSchema(new Schema.Parser().parse(schema))
.withDataModel(GenericData.get)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()

elements.foreach(element => writer.write(element))
writer.close()
}
// ...
---------------------------------

Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being dynamically loaded upon task/job manager(s) startup (also, we are keeping Flink's default inverted class loading strategy), shouldn't Hadoop dependencies be loaded by parent-first? (based on classloader.parent-first-patterns.default)

We also tried to put "flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" on Flink's /lib folder, but when doing that we got this error instead:

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

The only way we are being able to make the application work as expected is to include the dependency "hadoop-aws" with compile scope, but we get a fat JAR and transitive dependencies on Hadoop libraries that we would like to avoid.

What would be the most appropriate way to take advantage of cluster's "flink-s3-fs-hadoop" and avoid to deliver any Hadoop related library on our application JAR?

The dependencies we're using in the build.sbt file:
---------------------------------
lazy val dependencies =
new {
val flinkVersion = "1.10.0"
val parquetAvroVersion = "1.10.1"
val hadoopVersion = "3.2.1"
val circeVersion = "0.12.3"
val rogachVersion = "3.3.1"
val loggingVersion = "3.7.2"
val scalatestVersion = "3.0.5"
val mockitoVersion = "1.10.0"
val kafkaVersion = "2.2.0"
val scalajVersion = "2.4.2"
val snakeYamlVersion = "1.25"
val slf4jVersion = "1.7.30"
val beanUtilsVersion = "1.9.4"
val shadedHadoopVersion = "2.8.3-10.0"

// Core libraries provided at runtime
val flink = "org.apache.flink" %% "flink-scala" % flinkVersion % "provided"
val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided"
val flinks3Hadoop = "org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion % "provided"

// Application specific dependencies.
val flinkConnectorKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion
val flinkStateBackendRocksDb = "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion
val flinkParquet = "org.apache.flink" %% "flink-parquet" % flinkVersion
val flinkDropwizard = "org.apache.flink" % "flink-metrics-dropwizard" % flinkVersion
val parquetAvro = "org.apache.parquet" % "parquet-avro" % parquetAvroVersion
val circeCore = "io.circe" %% "circe-core" % circeVersion
val circeParser = "io.circe" %% "circe-parser" % circeVersion
val circeGeneric = "io.circe" %% "circe-generic" % circeVersion
val scallop = "org.rogach" %% "scallop" % rogachVersion
val logging = "com.typesafe.scala-logging" %% "scala-logging" % loggingVersion
val snakeYaml = "org.yaml" % "snakeyaml" % snakeYamlVersion
val slf4j = "org.slf4j" % "slf4j-log4j12" % slf4jVersion
val beanUtils = "commons-beanutils" % "commons-beanutils" % beanUtilsVersion

// Test libraries
val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion % "test"
val mockito = "org.mockito" %% "mockito-scala" % mockitoVersion % "test"
val flinkTestUtils = "org.apache.flink" %% "flink-test-utils" % flinkVersion % "test"
val kafkaStreams = "org.apache.kafka" % "kafka-streams" % kafkaVersion % "test"
val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion % "test"
val kafka = "org.apache.kafka" %% "kafka" % kafkaVersion % "test"
val hadoopClient = "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "test"

// Test classifiers only
val flinkRuntimeTest = "org.apache.flink" %% "flink-runtime" % flinkVersion % "test" classifier "tests"
val kafkaTest = "org.apache.kafka" %% "kafka" % kafkaVersion % "test" classifier "test"
val kafkaStreamsTest = "org.apache.kafka" % "kafka-streams" % kafkaVersion % "test" classifier "test"
val kafkaClientsTest = "org.apache.kafka" % "kafka-clients" % kafkaVersion % "test" classifier "test"
}
---------------------------------



This is the Dockerfile:
---------------------------------
FROM flink:1.10.0-scala_2.12
RUN cp /opt/flink/opt/flink-metrics-prometheus-1.10.0.jar /opt/flink/lib
RUN mkdir /opt/flink/plugins/flink-s3-fs-presto /opt/flink/plugins/flink-s3-fs-hadoop
RUN cp /opt/flink/opt/flink-s3-fs-presto-1.10.0.jar /opt/flink/plugins/flink-s3-fs-presto/
RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.10.0.jar /opt/flink/plugins/flink-s3-fs-hadoop/
RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-presto/
RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop/
---------------------------------

--
Best regards,
Ricardo Cardante.