Hi! We're working on a project where data is being written to S3 within a Flink application. Running the integration tests locally / IntelliJ (using MiniClusterWithClientResource) all the dependencies are correctly resolved and the program executes as expected. However, when fat JAR is submitted to a Flink setup running on docker, we're getting the following exception: --------------------------------- java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path --------------------------------- Which refers to the usage of that class in a RichSinkFunction while building an AvroParquetWriter --------------------------------- import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetFileWriter // ... Try { val writer = AvroParquetWriter .builder[GenericRecord](new Path(finalFilePath)) .withSchema(new Schema.Parser().parse(schema)) .withDataModel(GenericData.get) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() elements.foreach(element => writer.write(element)) writer.close() } // ... --------------------------------- Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being dynamically loaded upon task/job manager(s) startup (also, we are keeping Flink's default inverted class loading strategy), shouldn't Hadoop dependencies be loaded by parent-first? (based on classloader.parent-first-patterns.default) We also tried to put "flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" on Flink's /lib folder, but when doing that we got this error instead: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found The only way we are being able to make the application work as expected is to include the dependency "hadoop-aws" with compile scope, but we get a fat JAR and transitive dependencies on Hadoop libraries that we would like to avoid. What would be the most appropriate way to take advantage of cluster's "flink-s3-fs-hadoop" and avoid to deliver any Hadoop related library on our application JAR? The dependencies we're using in the build.sbt file: --------------------------------- lazy val dependencies = new { val flinkVersion = "1.10.0" val parquetAvroVersion = "1.10.1" val hadoopVersion = "3.2.1" val circeVersion = "0.12.3" val rogachVersion = "3.3.1" val loggingVersion = "3.7.2" val scalatestVersion = "3.0.5" val mockitoVersion = "1.10.0" val kafkaVersion = "2.2.0" val scalajVersion = "2.4.2" val snakeYamlVersion = "1.25" val slf4jVersion = "1.7.30" val beanUtilsVersion = "1.9.4" val shadedHadoopVersion = "2.8.3-10.0" // Core libraries provided at runtime val flink = "org.apache.flink" %% "flink-scala" % flinkVersion % "provided" val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided" val flinks3Hadoop = "org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion % "provided" // Application specific dependencies. val flinkConnectorKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion val flinkStateBackendRocksDb = "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion val flinkParquet = "org.apache.flink" %% "flink-parquet" % flinkVersion val flinkDropwizard = "org.apache.flink" % "flink-metrics-dropwizard" % flinkVersion val parquetAvro = "org.apache.parquet" % "parquet-avro" % parquetAvroVersion val circeCore = "io.circe" %% "circe-core" % circeVersion val circeParser = "io.circe" %% "circe-parser" % circeVersion val circeGeneric = "io.circe" %% "circe-generic" % circeVersion val scallop = "org.rogach" %% "scallop" % rogachVersion val logging = "com.typesafe.scala-logging" %% "scala-logging" % loggingVersion val snakeYaml = "org.yaml" % "snakeyaml" % snakeYamlVersion val slf4j = "org.slf4j" % "slf4j-log4j12" % slf4jVersion val beanUtils = "commons-beanutils" % "commons-beanutils" % beanUtilsVersion // Test libraries val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion % "test" val mockito = "org.mockito" %% "mockito-scala" % mockitoVersion % "test" val flinkTestUtils = "org.apache.flink" %% "flink-test-utils" % flinkVersion % "test" val kafkaStreams = "org.apache.kafka" % "kafka-streams" % kafkaVersion % "test" val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion % "test" val kafka = "org.apache.kafka" %% "kafka" % kafkaVersion % "test" val hadoopClient = "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "test" // Test classifiers only val flinkRuntimeTest = "org.apache.flink" %% "flink-runtime" % flinkVersion % "test" classifier "tests" val kafkaTest = "org.apache.kafka" %% "kafka" % kafkaVersion % "test" classifier "test" val kafkaStreamsTest = "org.apache.kafka" % "kafka-streams" % kafkaVersion % "test" classifier "test" val kafkaClientsTest = "org.apache.kafka" % "kafka-clients" % kafkaVersion % "test" classifier "test" } --------------------------------- This is the Dockerfile: --------------------------------- FROM flink:1.10.0-scala_2.12 RUN cp /opt/flink/opt/flink-metrics-prometheus-1.10.0.jar /opt/flink/lib RUN mkdir /opt/flink/plugins/flink-s3-fs-presto /opt/flink/plugins/flink-s3-fs-hadoop RUN cp /opt/flink/opt/flink-s3-fs-presto-1.10.0.jar /opt/flink/plugins/flink-s3-fs-presto/ RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.10.0.jar /opt/flink/plugins/flink-s3-fs-hadoop/ RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-presto/ RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop/ --------------------------------- -- Best regards, Ricardo Cardante.
|
Hi,
I think you are misunderstanding plugins. The fact that you have added s3 FileSystem plugin, doesn’t mean that your code can access it’s dependencies. The whole point of plugins class loading is to completely isolate plugins between one another, and to isolate them from any user code. Plugin classes are not loaded to the parent class loader, but to a separate class loader that’s independent from the FlinkUserClassLoader (containing user’s jars).
Also in the first place, you probably shouldn’t be using AvroParquetWriter directly, but use StreamingFileSink [1] to write Parquet files. Example can be found here [2]. If you are using `org.apache.parquet.avro.AvroParquetWriter` directly, you will not have any checkpointing support (potential data loss or data duplication issues). Even I’m not sure if your code can be executed in parallel (aren’t you trying to share one instance of org.apache.parquet.avro.AvroParquetWriter among multiple operators?). But let’s say that you have to use AvroParquetWriter directly for some reason. In that case you would have to add all of the required dependencies to your job’s fat jar (or usrlib directory?), and you should be using TwoPhaseCommitSinkFunction as a base class for your writer [3]. Implementing properly an exactly-once sink is not that trivial - unless you know what you are doing. Piotrek
|
Free forum by Nabble | Edit this page |