Hi,
I use flink-scala version 1.9.1 and scala 2.12.10, and I defined a data type which is a bit more complex: it has a list in it and even a dictionary. When I try to use a custom map I got this error: INFO org.apache.flink.api.java.typeutils.TypeExtractor - class A does not contain a setter for field fields INFO org.apache.flink.api.java.typeutils.TypeExtractor - class A cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. Is there a fix for this? Or a workaround? Thank you, Alex |
Hi Alex,
the problem is that `case class` classes are analyzed by Scala specific code whereas `class` classes are analyzed with Java specific code. So I would recommend to use a case class to make sure you stay in the "Scala world" otherwise the fallback is the Java-based TypeExtractor. For your custom Map, you can simply ignore this error message. It will fallback to the Java-based TypeExtractor and treat it as a generic type because it is not a POJO. I hope this helps. Regards, Timo On 19.12.19 12:41, Alexandru Vasiu wrote: > Hi, > > I use flink-scala version 1.9.1 and scala 2.12.10, and I defined a data > type which is a bit more complex: it has a list in it and even a > dictionary. When I try to use a custom map I got this error: > > INFO org.apache.flink.api.java.typeutils.TypeExtractor - class A does > not contain a setter for field fields > INFO org.apache.flink.api.java.typeutils.TypeExtractor - class A cannot > be used as a POJO type because not all fields are valid POJO fields, and > must be processed as GenericType. Please read the Flink documentation on > "Data Types & Serialization" for details of the effect on performance. > > Is there a fix for this? Or a workaround? > > Thank you, > Alex |
I used `case class` for example case class A(a: Map[String, String]) so it should work Alex On Thu, Dec 19, 2019 at 2:18 PM Timo Walther <[hidden email]> wrote: Hi Alex, |
I'm sorry for my last message, it might be incomplete. So I used case classed for my objects, but it doesn't work. Riching this error: "Exception in thread "main" org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when I'm trying to apply the map/flatMap function over the stream (which is from a Kafka consumer). Alex On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu <[hidden email]> wrote:
|
That's sounds like a classloading or most likely dependency issue.
Are all dependencies including Flink use the same Scala version? Could you maybe share reproducible some code with us? Regards, Timo On 19.12.19 13:53, Alexandru Vasiu wrote: > I'm sorry for my last message, it might be incomplete. > > So I used case classed for my objects, but it doesn't work. > > Riching this error: "Exception in thread "main" > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError: > java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when I'm > trying to apply the map/flatMap function over the stream (which is from > a Kafka consumer). > > > Alex > > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu > <[hidden email] <mailto:[hidden email]>> wrote: > > I used `case class` for example case class A(a: Map[String, String]) > so it should work > > Alex > > On Thu, Dec 19, 2019 at 2:18 PM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Alex, > > the problem is that `case class` classes are analyzed by Scala > specific > code whereas `class` classes are analyzed with Java specific > code. So I > would recommend to use a case class to make sure you stay in the > "Scala > world" otherwise the fallback is the Java-based TypeExtractor. > > For your custom Map, you can simply ignore this error message. > It will > fallback to the Java-based TypeExtractor and treat it as a > generic type > because it is not a POJO. > > I hope this helps. > > Regards, > Timo > > > On 19.12.19 12:41, Alexandru Vasiu wrote: > > Hi, > > > > I use flink-scala version 1.9.1 and scala 2.12.10, and I > defined a data > > type which is a bit more complex: it has a list in it and even a > > dictionary. When I try to use a custom map I got this error: > > > > INFO org.apache.flink.api.java.typeutils.TypeExtractor - > class A does > > not contain a setter for field fields > > INFO org.apache.flink.api.java.typeutils.TypeExtractor - > class A cannot > > be used as a POJO type because not all fields are valid POJO > fields, and > > must be processed as GenericType. Please read the Flink > documentation on > > "Data Types & Serialization" for details of the effect on > performance. > > > > Is there a fix for this? Or a workaround? > > > > Thank you, > > Alex > |
This is a part of my Gradle config: ext { scalaVersion = '2.12' flinkVersion = '1.9.1' scalaBuildVersion = "${scalaVersion}.10" scalaMockVersion = '4.4.0' circeGenericVersion = '0.12.3' circeExtrasVersion = '0.12.2' pardiseVersion = '2.1.1' slf4jVersion = '1.7.7' log4jVersion = '1.2.17' sourceDir = 'src/main/scala' testDir = 'src/test/scala' } repositories { mavenCentral() //maven { url "https://repository.apache.org/content/repositories/snapshots/" } } configurations { scalaCompilerPlugin } dependencies { implementation "org.scala-lang:scala-library:${scalaBuildVersion}" // -------------------------------------------------------------- // Compile-time dependencies that should NOT be part of the // shadow jar and are provided in the lib folder of Flink // -------------------------------------------------------------- //compile "org.apache.flink:flink-java:${flinkVersion}" implementation "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}" implementation "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}" // -------------------------------------------------------------- // Dependencies that should be part of the shadow jar, e.g. // connectors. These must be in the flinkShadowJar configuration! // -------------------------------------------------------------- //flinkShadowJar "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}" // https://mvnrepository.com/artifact/io.circe/ implementation "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}" implementation "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}" implementation "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}" // https://mvnrepository.com/artifact/org.scalamacros/paradise scalaCompilerPlugin "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}" implementation "log4j:log4j:${log4jVersion}" implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}" // Add test dependencies here. //testImplementation "junit:junit:4.12" testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0" // https://mvnrepository.com/artifact/org.scalamock/scalamock testImplementation "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}" } So all are with the same scala version. I cannot share the code, but the main app looks like: val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env .addSource(KAFKA_STREAM) // this will get us a stream with our object model which is like this: case class A(a:Map[String, other_case_class_obj], b: List[other_case_class_obj], c: String) .flatMap(CustomFlatMap()) Thank you, Alex On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <[hidden email]> wrote: That's sounds like a classloading or most likely dependency issue. |
I see a mismatch between scalaBuildVersion and scalaVersion could this
be the issue? Regards, Timo On 19.12.19 14:33, Alexandru Vasiu wrote: > This is a part of my Gradle config: > > ext { > scalaVersion = '2.12' > flinkVersion = '1.9.1' > scalaBuildVersion = "${scalaVersion}.10" > scalaMockVersion = '4.4.0' > circeGenericVersion = '0.12.3' > circeExtrasVersion = '0.12.2' > pardiseVersion = '2.1.1' > slf4jVersion = '1.7.7' > log4jVersion = '1.2.17' > sourceDir = 'src/main/scala' > testDir = 'src/test/scala' > } > repositories { > mavenCentral() > //maven { url > "https://repository.apache.org/content/repositories/snapshots/" } > } > configurations { > scalaCompilerPlugin > } > dependencies { > implementation "org.scala-lang:scala-library:${scalaBuildVersion}" > // -------------------------------------------------------------- > // Compile-time dependencies that should NOT be part of the > // shadow jar and are provided in the lib folder of Flink > // -------------------------------------------------------------- > //compile "org.apache.flink:flink-java:${flinkVersion}" > implementation > "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}" > implementation > "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}" > // -------------------------------------------------------------- > // Dependencies that should be part of the shadow jar, e.g. > // connectors. These must be in the flinkShadowJar configuration! > // -------------------------------------------------------------- > //flinkShadowJar > "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}" > // https://mvnrepository.com/artifact/io.circe/ > implementation > "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}" > implementation > "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}" > implementation > "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}" > // https://mvnrepository.com/artifact/org.scalamacros/paradise > scalaCompilerPlugin > "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}" > implementation "log4j:log4j:${log4jVersion}" > implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}" > // Add test dependencies here. > //testImplementation "junit:junit:4.12" > testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0" > // https://mvnrepository.com/artifact/org.scalamock/scalamock > testImplementation > "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}" > } > > So all are with the same scala version. I cannot share the code, but the > main app looks like: > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val stream = env > .addSource(KAFKA_STREAM) // this will get us a stream with our > object model which is like this: case class A(a:Map[String, > other_case_class_obj], b: List[other_case_class_obj], c: String) > .flatMap(CustomFlatMap()) > > Thank you, > Alex > > On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > That's sounds like a classloading or most likely dependency issue. > > Are all dependencies including Flink use the same Scala version? Could > you maybe share reproducible some code with us? > > Regards, > Timo > > > On 19.12.19 13:53, Alexandru Vasiu wrote: > > I'm sorry for my last message, it might be incomplete. > > > > So I used case classed for my objects, but it doesn't work. > > > > Riching this error: "Exception in thread "main" > > > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError: > > > java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when > I'm > > trying to apply the map/flatMap function over the stream (which > is from > > a Kafka consumer). > > > > > > Alex > > > > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu > > <[hidden email] <mailto:[hidden email]> > <mailto:[hidden email] > <mailto:[hidden email]>>> wrote: > > > > I used `case class` for example case class A(a: Map[String, > String]) > > so it should work > > > > Alex > > > > On Thu, Dec 19, 2019 at 2:18 PM Timo Walther > <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > Hi Alex, > > > > the problem is that `case class` classes are analyzed by > Scala > > specific > > code whereas `class` classes are analyzed with Java specific > > code. So I > > would recommend to use a case class to make sure you stay > in the > > "Scala > > world" otherwise the fallback is the Java-based > TypeExtractor. > > > > For your custom Map, you can simply ignore this error > message. > > It will > > fallback to the Java-based TypeExtractor and treat it as a > > generic type > > because it is not a POJO. > > > > I hope this helps. > > > > Regards, > > Timo > > > > > > On 19.12.19 12:41, Alexandru Vasiu wrote: > > > Hi, > > > > > > I use flink-scala version 1.9.1 and scala 2.12.10, and I > > defined a data > > > type which is a bit more complex: it has a list in it > and even a > > > dictionary. When I try to use a custom map I got this > error: > > > > > > INFO org.apache.flink.api.java.typeutils.TypeExtractor - > > class A does > > > not contain a setter for field fields > > > INFO org.apache.flink.api.java.typeutils.TypeExtractor - > > class A cannot > > > be used as a POJO type because not all fields are > valid POJO > > fields, and > > > must be processed as GenericType. Please read the Flink > > documentation on > > > "Data Types & Serialization" for details of the effect on > > performance. > > > > > > Is there a fix for this? Or a workaround? > > > > > > Thank you, > > > Alex > > > |
Nope, because scalaBuildVersion is the scala version including minor version so in this case: 2.12.10 and we used it just where we need. We used scalaVersion to specify for each library what scala is used, so used flink will be flink-streaming-scala_2.12 Alex On Thu, Dec 19, 2019 at 3:40 PM Timo Walther <[hidden email]> wrote: I see a mismatch between scalaBuildVersion and scalaVersion could this |
Sorry, you are right. Maybe you can also share the full stack trace
because I don't know where this guava library should be used. Regards, Timo On 19.12.19 14:50, Alexandru Vasiu wrote: > Nope, because scalaBuildVersion is the scala version including minor > version so in this case: 2.12.10 and we used it just where we need. > We used scalaVersion to specify for each library what scala is used, so > used flink will be flink-streaming-scala_2.12 > > Alex > > On Thu, Dec 19, 2019 at 3:40 PM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > I see a mismatch between scalaBuildVersion and scalaVersion could this > be the issue? > > Regards, > Timo > > > On 19.12.19 14:33, Alexandru Vasiu wrote: > > This is a part of my Gradle config: > > > > ext { > > scalaVersion = '2.12' > > flinkVersion = '1.9.1' > > scalaBuildVersion = "${scalaVersion}.10" > > scalaMockVersion = '4.4.0' > > circeGenericVersion = '0.12.3' > > circeExtrasVersion = '0.12.2' > > pardiseVersion = '2.1.1' > > slf4jVersion = '1.7.7' > > log4jVersion = '1.2.17' > > sourceDir = 'src/main/scala' > > testDir = 'src/test/scala' > > } > > repositories { > > mavenCentral() > > //maven { url > > "https://repository.apache.org/content/repositories/snapshots/" } > > } > > configurations { > > scalaCompilerPlugin > > } > > dependencies { > > implementation > "org.scala-lang:scala-library:${scalaBuildVersion}" > > // > -------------------------------------------------------------- > > // Compile-time dependencies that should NOT be part of the > > // shadow jar and are provided in the lib folder of Flink > > // > -------------------------------------------------------------- > > //compile "org.apache.flink:flink-java:${flinkVersion}" > > implementation > > > "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}" > > implementation > > > "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}" > > // > -------------------------------------------------------------- > > // Dependencies that should be part of the shadow jar, e.g. > > // connectors. These must be in the flinkShadowJar > configuration! > > // > -------------------------------------------------------------- > > //flinkShadowJar > > > "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}" > > // https://mvnrepository.com/artifact/io.circe/ > > implementation > > "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}" > > implementation > > "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}" > > implementation > > "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}" > > // https://mvnrepository.com/artifact/org.scalamacros/paradise > > scalaCompilerPlugin > > "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}" > > implementation "log4j:log4j:${log4jVersion}" > > implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}" > > // Add test dependencies here. > > //testImplementation "junit:junit:4.12" > > testImplementation > "org.scalatest:scalatest_${scalaVersion}:3.1.0" > > // https://mvnrepository.com/artifact/org.scalamock/scalamock > > testImplementation > > "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}" > > } > > > > So all are with the same scala version. I cannot share the code, > but the > > main app looks like: > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > val stream = env > > .addSource(KAFKA_STREAM) // this will get us a stream > with our > > object model which is like this: case class A(a:Map[String, > > other_case_class_obj], b: List[other_case_class_obj], c: String) > > .flatMap(CustomFlatMap()) > > > > Thank you, > > Alex > > > > On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <[hidden email] > <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > That's sounds like a classloading or most likely dependency > issue. > > > > Are all dependencies including Flink use the same Scala > version? Could > > you maybe share reproducible some code with us? > > > > Regards, > > Timo > > > > > > On 19.12.19 13:53, Alexandru Vasiu wrote: > > > I'm sorry for my last message, it might be incomplete. > > > > > > So I used case classed for my objects, but it doesn't work. > > > > > > Riching this error: "Exception in thread "main" > > > > > > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError: > > > > > java.lang.NoClassDefFoundError: > scala/math/Ordering$$anon$9" when > > I'm > > > trying to apply the map/flatMap function over the stream > (which > > is from > > > a Kafka consumer). > > > > > > > > > Alex > > > > > > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu > > > <[hidden email] > <mailto:[hidden email]> > <mailto:[hidden email] <mailto:[hidden email]>> > > <mailto:[hidden email] > <mailto:[hidden email]> > > <mailto:[hidden email] > <mailto:[hidden email]>>>> wrote: > > > > > > I used `case class` for example case class A(a: > Map[String, > > String]) > > > so it should work > > > > > > Alex > > > > > > On Thu, Dec 19, 2019 at 2:18 PM Timo Walther > > <[hidden email] <mailto:[hidden email]> > <mailto:[hidden email] <mailto:[hidden email]>> > > > <mailto:[hidden email] <mailto:[hidden email]> > <mailto:[hidden email] <mailto:[hidden email]>>>> wrote: > > > > > > Hi Alex, > > > > > > the problem is that `case class` classes are > analyzed by > > Scala > > > specific > > > code whereas `class` classes are analyzed with > Java specific > > > code. So I > > > would recommend to use a case class to make sure > you stay > > in the > > > "Scala > > > world" otherwise the fallback is the Java-based > > TypeExtractor. > > > > > > For your custom Map, you can simply ignore this error > > message. > > > It will > > > fallback to the Java-based TypeExtractor and treat > it as a > > > generic type > > > because it is not a POJO. > > > > > > I hope this helps. > > > > > > Regards, > > > Timo > > > > > > > > > On 19.12.19 12:41, Alexandru Vasiu wrote: > > > > Hi, > > > > > > > > I use flink-scala version 1.9.1 and scala > 2.12.10, and I > > > defined a data > > > > type which is a bit more complex: it has a list > in it > > and even a > > > > dictionary. When I try to use a custom map I > got this > > error: > > > > > > > > INFO > org.apache.flink.api.java.typeutils.TypeExtractor - > > > class A does > > > > not contain a setter for field fields > > > > INFO > org.apache.flink.api.java.typeutils.TypeExtractor - > > > class A cannot > > > > be used as a POJO type because not all fields are > > valid POJO > > > fields, and > > > > must be processed as GenericType. Please read > the Flink > > > documentation on > > > > "Data Types & Serialization" for details of the > effect on > > > performance. > > > > > > > > Is there a fix for this? Or a workaround? > > > > > > > > Thank you, > > > > Alex > > > > > > |
Hi, We fixed it by converting the case class to a class. Thank you, Alex On Thu, Dec 19, 2019 at 5:43 PM Timo Walther <[hidden email]> wrote: Sorry, you are right. Maybe you can also share the full stack trace |
Free forum by Nabble | Edit this page |