POJO ERROR

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

POJO ERROR

Alexandru Vasiu
Hi,

I use flink-scala version 1.9.1 and scala 2.12.10, and I defined a data type which is a bit more complex: it has a list in it and even a dictionary. When I try to use a custom map I got this error:

INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A  does not contain a setter for field fields
INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

Is there a fix for this? Or a workaround?

Thank you,
Alex
Reply | Threaded
Open this post in threaded view
|

Re: POJO ERROR

Timo Walther
Hi Alex,

the problem is that `case class` classes are analyzed by Scala specific
code whereas `class` classes are analyzed with Java specific code. So I
would recommend to use a case class to make sure you stay in the "Scala
world" otherwise the fallback is the Java-based TypeExtractor.

For your custom Map, you can simply ignore this error message. It will
fallback to the Java-based TypeExtractor and treat it as a generic type
because it is not a POJO.

I hope this helps.

Regards,
Timo


On 19.12.19 12:41, Alexandru Vasiu wrote:

> Hi,
>
> I use flink-scala version 1.9.1 and scala 2.12.10, and I defined a data
> type which is a bit more complex: it has a list in it and even a
> dictionary. When I try to use a custom map I got this error:
>
> INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A  does
> not contain a setter for field fields
> INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A cannot
> be used as a POJO type because not all fields are valid POJO fields, and
> must be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on performance.
>
> Is there a fix for this? Or a workaround?
>
> Thank you,
> Alex

Reply | Threaded
Open this post in threaded view
|

Re: POJO ERROR

Alexandru Vasiu
I used `case class` for example case class A(a: Map[String, String]) so it should work

Alex

On Thu, Dec 19, 2019 at 2:18 PM Timo Walther <[hidden email]> wrote:
Hi Alex,

the problem is that `case class` classes are analyzed by Scala specific
code whereas `class` classes are analyzed with Java specific code. So I
would recommend to use a case class to make sure you stay in the "Scala
world" otherwise the fallback is the Java-based TypeExtractor.

For your custom Map, you can simply ignore this error message. It will
fallback to the Java-based TypeExtractor and treat it as a generic type
because it is not a POJO.

I hope this helps.

Regards,
Timo


On 19.12.19 12:41, Alexandru Vasiu wrote:
> Hi,
>
> I use flink-scala version 1.9.1 and scala 2.12.10, and I defined a data
> type which is a bit more complex: it has a list in it and even a
> dictionary. When I try to use a custom map I got this error:
>
> INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A  does
> not contain a setter for field fields
> INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A cannot
> be used as a POJO type because not all fields are valid POJO fields, and
> must be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on performance.
>
> Is there a fix for this? Or a workaround?
>
> Thank you,
> Alex

Reply | Threaded
Open this post in threaded view
|

Re: POJO ERROR

Alexandru Vasiu
I'm sorry for my last message, it might be incomplete.

So I used case classed for my objects, but it doesn't work.

Riching this error: "Exception in thread "main" org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when I'm trying to apply the map/flatMap function over the stream (which is from a Kafka consumer).


Alex

On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu <[hidden email]> wrote:
I used `case class` for example case class A(a: Map[String, String]) so it should work

Alex

On Thu, Dec 19, 2019 at 2:18 PM Timo Walther <[hidden email]> wrote:
Hi Alex,

the problem is that `case class` classes are analyzed by Scala specific
code whereas `class` classes are analyzed with Java specific code. So I
would recommend to use a case class to make sure you stay in the "Scala
world" otherwise the fallback is the Java-based TypeExtractor.

For your custom Map, you can simply ignore this error message. It will
fallback to the Java-based TypeExtractor and treat it as a generic type
because it is not a POJO.

I hope this helps.

Regards,
Timo


On 19.12.19 12:41, Alexandru Vasiu wrote:
> Hi,
>
> I use flink-scala version 1.9.1 and scala 2.12.10, and I defined a data
> type which is a bit more complex: it has a list in it and even a
> dictionary. When I try to use a custom map I got this error:
>
> INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A  does
> not contain a setter for field fields
> INFO  org.apache.flink.api.java.typeutils.TypeExtractor - class A cannot
> be used as a POJO type because not all fields are valid POJO fields, and
> must be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on performance.
>
> Is there a fix for this? Or a workaround?
>
> Thank you,
> Alex

Reply | Threaded
Open this post in threaded view
|

Re: POJO ERROR

Timo Walther
That's sounds like a classloading or most likely dependency issue.

Are all dependencies including Flink use the same Scala version? Could
you maybe share reproducible some code with us?

Regards,
Timo


On 19.12.19 13:53, Alexandru Vasiu wrote:

> I'm sorry for my last message, it might be incomplete.
>
> So I used case classed for my objects, but it doesn't work.
>
> Riching this error: "Exception in thread "main"
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
> java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when I'm
> trying to apply the map/flatMap function over the stream (which is from
> a Kafka consumer).
>
>
> Alex
>
> On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     I used `case class` for example case class A(a: Map[String, String])
>     so it should work
>
>     Alex
>
>     On Thu, Dec 19, 2019 at 2:18 PM Timo Walther <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Alex,
>
>         the problem is that `case class` classes are analyzed by Scala
>         specific
>         code whereas `class` classes are analyzed with Java specific
>         code. So I
>         would recommend to use a case class to make sure you stay in the
>         "Scala
>         world" otherwise the fallback is the Java-based TypeExtractor.
>
>         For your custom Map, you can simply ignore this error message.
>         It will
>         fallback to the Java-based TypeExtractor and treat it as a
>         generic type
>         because it is not a POJO.
>
>         I hope this helps.
>
>         Regards,
>         Timo
>
>
>         On 19.12.19 12:41, Alexandru Vasiu wrote:
>          > Hi,
>          >
>          > I use flink-scala version 1.9.1 and scala 2.12.10, and I
>         defined a data
>          > type which is a bit more complex: it has a list in it and even a
>          > dictionary. When I try to use a custom map I got this error:
>          >
>          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
>         class A  does
>          > not contain a setter for field fields
>          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
>         class A cannot
>          > be used as a POJO type because not all fields are valid POJO
>         fields, and
>          > must be processed as GenericType. Please read the Flink
>         documentation on
>          > "Data Types & Serialization" for details of the effect on
>         performance.
>          >
>          > Is there a fix for this? Or a workaround?
>          >
>          > Thank you,
>          > Alex
>

Reply | Threaded
Open this post in threaded view
|

Re: POJO ERROR

Alexandru Vasiu
This is a part of my Gradle config: 

ext {
    scalaVersion = '2.12'
    flinkVersion = '1.9.1'
    scalaBuildVersion = "${scalaVersion}.10"
    scalaMockVersion = '4.4.0'
    circeGenericVersion = '0.12.3'
    circeExtrasVersion = '0.12.2'
    pardiseVersion = '2.1.1'
    slf4jVersion = '1.7.7'
    log4jVersion = '1.2.17'
    sourceDir = 'src/main/scala'
    testDir = 'src/test/scala'
}
repositories {
    mavenCentral()
    //maven { url "https://repository.apache.org/content/repositories/snapshots/" }
}
configurations {
    scalaCompilerPlugin
}
dependencies {
    implementation "org.scala-lang:scala-library:${scalaBuildVersion}"
    // --------------------------------------------------------------
    // Compile-time dependencies that should NOT be part of the
    // shadow jar and are provided in the lib folder of Flink
    // --------------------------------------------------------------
    //compile "org.apache.flink:flink-java:${flinkVersion}"
    implementation "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
    implementation "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
    // --------------------------------------------------------------
    // Dependencies that should be part of the shadow jar, e.g.
    // connectors. These must be in the flinkShadowJar configuration!
    // --------------------------------------------------------------
    //flinkShadowJar "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
    // https://mvnrepository.com/artifact/io.circe/
    implementation "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
    implementation "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
    implementation "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
    // https://mvnrepository.com/artifact/org.scalamacros/paradise
    scalaCompilerPlugin "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
    implementation "log4j:log4j:${log4jVersion}"
    implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
    // Add test dependencies here.
    //testImplementation "junit:junit:4.12"
    testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0"
    // https://mvnrepository.com/artifact/org.scalamock/scalamock
    testImplementation "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
}

So all are with the same scala version. I cannot share the code, but the main app looks like:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream  = env
        .addSource(KAFKA_STREAM) // this will get us a stream with our object model which is like this: case class A(a:Map[String, other_case_class_obj], b: List[other_case_class_obj], c: String)
.flatMap(CustomFlatMap())
.print

Thank you,
Alex

On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <[hidden email]> wrote:
That's sounds like a classloading or most likely dependency issue.

Are all dependencies including Flink use the same Scala version? Could
you maybe share reproducible some code with us?

Regards,
Timo


On 19.12.19 13:53, Alexandru Vasiu wrote:
> I'm sorry for my last message, it might be incomplete.
>
> So I used case classed for my objects, but it doesn't work.
>
> Riching this error: "Exception in thread "main"
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
> java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when I'm
> trying to apply the map/flatMap function over the stream (which is from
> a Kafka consumer).
>
>
> Alex
>
> On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     I used `case class` for example case class A(a: Map[String, String])
>     so it should work
>
>     Alex
>
>     On Thu, Dec 19, 2019 at 2:18 PM Timo Walther <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Alex,
>
>         the problem is that `case class` classes are analyzed by Scala
>         specific
>         code whereas `class` classes are analyzed with Java specific
>         code. So I
>         would recommend to use a case class to make sure you stay in the
>         "Scala
>         world" otherwise the fallback is the Java-based TypeExtractor.
>
>         For your custom Map, you can simply ignore this error message.
>         It will
>         fallback to the Java-based TypeExtractor and treat it as a
>         generic type
>         because it is not a POJO.
>
>         I hope this helps.
>
>         Regards,
>         Timo
>
>
>         On 19.12.19 12:41, Alexandru Vasiu wrote:
>          > Hi,
>          >
>          > I use flink-scala version 1.9.1 and scala 2.12.10, and I
>         defined a data
>          > type which is a bit more complex: it has a list in it and even a
>          > dictionary. When I try to use a custom map I got this error:
>          >
>          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
>         class A  does
>          > not contain a setter for field fields
>          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
>         class A cannot
>          > be used as a POJO type because not all fields are valid POJO
>         fields, and
>          > must be processed as GenericType. Please read the Flink
>         documentation on
>          > "Data Types & Serialization" for details of the effect on
>         performance.
>          >
>          > Is there a fix for this? Or a workaround?
>          >
>          > Thank you,
>          > Alex
>

Reply | Threaded
Open this post in threaded view
|

Re: POJO ERROR

Timo Walther
I see a mismatch between scalaBuildVersion and scalaVersion could this
be the issue?

Regards,
Timo


On 19.12.19 14:33, Alexandru Vasiu wrote:

> This is a part of my Gradle config:
>
> ext {
>      scalaVersion = '2.12'
>      flinkVersion = '1.9.1'
>      scalaBuildVersion = "${scalaVersion}.10"
>      scalaMockVersion = '4.4.0'
>      circeGenericVersion = '0.12.3'
>      circeExtrasVersion = '0.12.2'
>      pardiseVersion = '2.1.1'
>      slf4jVersion = '1.7.7'
>      log4jVersion = '1.2.17'
>      sourceDir = 'src/main/scala'
>      testDir = 'src/test/scala'
> }
> repositories {
>      mavenCentral()
>      //maven { url
> "https://repository.apache.org/content/repositories/snapshots/" }
> }
> configurations {
>      scalaCompilerPlugin
> }
> dependencies {
>      implementation "org.scala-lang:scala-library:${scalaBuildVersion}"
>      // --------------------------------------------------------------
>      // Compile-time dependencies that should NOT be part of the
>      // shadow jar and are provided in the lib folder of Flink
>      // --------------------------------------------------------------
>      //compile "org.apache.flink:flink-java:${flinkVersion}"
>      implementation
> "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
>      implementation
> "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
>      // --------------------------------------------------------------
>      // Dependencies that should be part of the shadow jar, e.g.
>      // connectors. These must be in the flinkShadowJar configuration!
>      // --------------------------------------------------------------
>      //flinkShadowJar
> "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
>      // https://mvnrepository.com/artifact/io.circe/
>      implementation
> "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
>      implementation
> "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
>      implementation
> "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
>      // https://mvnrepository.com/artifact/org.scalamacros/paradise
>      scalaCompilerPlugin
> "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
>      implementation "log4j:log4j:${log4jVersion}"
>      implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>      // Add test dependencies here.
>      //testImplementation "junit:junit:4.12"
>      testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0"
>      // https://mvnrepository.com/artifact/org.scalamock/scalamock
>      testImplementation
> "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
> }
>
> So all are with the same scala version. I cannot share the code, but the
> main app looks like:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream  = env
>          .addSource(KAFKA_STREAM) // this will get us a stream with our
> object model which is like this: case class A(a:Map[String,
> other_case_class_obj], b: List[other_case_class_obj], c: String)
> .flatMap(CustomFlatMap())
> .print
>
> Thank you,
> Alex
>
> On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     That's sounds like a classloading or most likely dependency issue.
>
>     Are all dependencies including Flink use the same Scala version? Could
>     you maybe share reproducible some code with us?
>
>     Regards,
>     Timo
>
>
>     On 19.12.19 13:53, Alexandru Vasiu wrote:
>      > I'm sorry for my last message, it might be incomplete.
>      >
>      > So I used case classed for my objects, but it doesn't work.
>      >
>      > Riching this error: "Exception in thread "main"
>      >
>     org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
>
>      > java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when
>     I'm
>      > trying to apply the map/flatMap function over the stream (which
>     is from
>      > a Kafka consumer).
>      >
>      >
>      > Alex
>      >
>      > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
>      > <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >
>      >     I used `case class` for example case class A(a: Map[String,
>     String])
>      >     so it should work
>      >
>      >     Alex
>      >
>      >     On Thu, Dec 19, 2019 at 2:18 PM Timo Walther
>     <[hidden email] <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >         Hi Alex,
>      >
>      >         the problem is that `case class` classes are analyzed by
>     Scala
>      >         specific
>      >         code whereas `class` classes are analyzed with Java specific
>      >         code. So I
>      >         would recommend to use a case class to make sure you stay
>     in the
>      >         "Scala
>      >         world" otherwise the fallback is the Java-based
>     TypeExtractor.
>      >
>      >         For your custom Map, you can simply ignore this error
>     message.
>      >         It will
>      >         fallback to the Java-based TypeExtractor and treat it as a
>      >         generic type
>      >         because it is not a POJO.
>      >
>      >         I hope this helps.
>      >
>      >         Regards,
>      >         Timo
>      >
>      >
>      >         On 19.12.19 12:41, Alexandru Vasiu wrote:
>      >          > Hi,
>      >          >
>      >          > I use flink-scala version 1.9.1 and scala 2.12.10, and I
>      >         defined a data
>      >          > type which is a bit more complex: it has a list in it
>     and even a
>      >          > dictionary. When I try to use a custom map I got this
>     error:
>      >          >
>      >          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
>      >         class A  does
>      >          > not contain a setter for field fields
>      >          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
>      >         class A cannot
>      >          > be used as a POJO type because not all fields are
>     valid POJO
>      >         fields, and
>      >          > must be processed as GenericType. Please read the Flink
>      >         documentation on
>      >          > "Data Types & Serialization" for details of the effect on
>      >         performance.
>      >          >
>      >          > Is there a fix for this? Or a workaround?
>      >          >
>      >          > Thank you,
>      >          > Alex
>      >
>

Reply | Threaded
Open this post in threaded view
|

Re: POJO ERROR

Alexandru Vasiu
Nope, because scalaBuildVersion is the scala version including minor version so in this case: 2.12.10 and we used it just where we need. 
We used scalaVersion to specify for each library what scala is used, so used flink will be flink-streaming-scala_2.12

Alex

On Thu, Dec 19, 2019 at 3:40 PM Timo Walther <[hidden email]> wrote:
I see a mismatch between scalaBuildVersion and scalaVersion could this
be the issue?

Regards,
Timo


On 19.12.19 14:33, Alexandru Vasiu wrote:
> This is a part of my Gradle config:
>
> ext {
>      scalaVersion = '2.12'
>      flinkVersion = '1.9.1'
>      scalaBuildVersion = "${scalaVersion}.10"
>      scalaMockVersion = '4.4.0'
>      circeGenericVersion = '0.12.3'
>      circeExtrasVersion = '0.12.2'
>      pardiseVersion = '2.1.1'
>      slf4jVersion = '1.7.7'
>      log4jVersion = '1.2.17'
>      sourceDir = 'src/main/scala'
>      testDir = 'src/test/scala'
> }
> repositories {
>      mavenCentral()
>      //maven { url
> "https://repository.apache.org/content/repositories/snapshots/" }
> }
> configurations {
>      scalaCompilerPlugin
> }
> dependencies {
>      implementation "org.scala-lang:scala-library:${scalaBuildVersion}"
>      // --------------------------------------------------------------
>      // Compile-time dependencies that should NOT be part of the
>      // shadow jar and are provided in the lib folder of Flink
>      // --------------------------------------------------------------
>      //compile "org.apache.flink:flink-java:${flinkVersion}"
>      implementation
> "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
>      implementation
> "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
>      // --------------------------------------------------------------
>      // Dependencies that should be part of the shadow jar, e.g.
>      // connectors. These must be in the flinkShadowJar configuration!
>      // --------------------------------------------------------------
>      //flinkShadowJar
> "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
>      // https://mvnrepository.com/artifact/io.circe/
>      implementation
> "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
>      implementation
> "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
>      implementation
> "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
>      // https://mvnrepository.com/artifact/org.scalamacros/paradise
>      scalaCompilerPlugin
> "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
>      implementation "log4j:log4j:${log4jVersion}"
>      implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>      // Add test dependencies here.
>      //testImplementation "junit:junit:4.12"
>      testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0"
>      // https://mvnrepository.com/artifact/org.scalamock/scalamock
>      testImplementation
> "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
> }
>
> So all are with the same scala version. I cannot share the code, but the
> main app looks like:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream  = env
>          .addSource(KAFKA_STREAM) // this will get us a stream with our
> object model which is like this: case class A(a:Map[String,
> other_case_class_obj], b: List[other_case_class_obj], c: String)
> .flatMap(CustomFlatMap())
> .print
>
> Thank you,
> Alex
>
> On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     That's sounds like a classloading or most likely dependency issue.
>
>     Are all dependencies including Flink use the same Scala version? Could
>     you maybe share reproducible some code with us?
>
>     Regards,
>     Timo
>
>
>     On 19.12.19 13:53, Alexandru Vasiu wrote:
>      > I'm sorry for my last message, it might be incomplete.
>      >
>      > So I used case classed for my objects, but it doesn't work.
>      >
>      > Riching this error: "Exception in thread "main"
>      >
>     org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
>
>      > java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when
>     I'm
>      > trying to apply the map/flatMap function over the stream (which
>     is from
>      > a Kafka consumer).
>      >
>      >
>      > Alex
>      >
>      > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
>      > <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >
>      >     I used `case class` for example case class A(a: Map[String,
>     String])
>      >     so it should work
>      >
>      >     Alex
>      >
>      >     On Thu, Dec 19, 2019 at 2:18 PM Timo Walther
>     <[hidden email] <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >         Hi Alex,
>      >
>      >         the problem is that `case class` classes are analyzed by
>     Scala
>      >         specific
>      >         code whereas `class` classes are analyzed with Java specific
>      >         code. So I
>      >         would recommend to use a case class to make sure you stay
>     in the
>      >         "Scala
>      >         world" otherwise the fallback is the Java-based
>     TypeExtractor.
>      >
>      >         For your custom Map, you can simply ignore this error
>     message.
>      >         It will
>      >         fallback to the Java-based TypeExtractor and treat it as a
>      >         generic type
>      >         because it is not a POJO.
>      >
>      >         I hope this helps.
>      >
>      >         Regards,
>      >         Timo
>      >
>      >
>      >         On 19.12.19 12:41, Alexandru Vasiu wrote:
>      >          > Hi,
>      >          >
>      >          > I use flink-scala version 1.9.1 and scala 2.12.10, and I
>      >         defined a data
>      >          > type which is a bit more complex: it has a list in it
>     and even a
>      >          > dictionary. When I try to use a custom map I got this
>     error:
>      >          >
>      >          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
>      >         class A  does
>      >          > not contain a setter for field fields
>      >          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
>      >         class A cannot
>      >          > be used as a POJO type because not all fields are
>     valid POJO
>      >         fields, and
>      >          > must be processed as GenericType. Please read the Flink
>      >         documentation on
>      >          > "Data Types & Serialization" for details of the effect on
>      >         performance.
>      >          >
>      >          > Is there a fix for this? Or a workaround?
>      >          >
>      >          > Thank you,
>      >          > Alex
>      >
>

Reply | Threaded
Open this post in threaded view
|

Re: POJO ERROR

Timo Walther
Sorry, you are right. Maybe you can also share the full stack trace
because I don't know where this guava library should be used.

Regards,
Timo


On 19.12.19 14:50, Alexandru Vasiu wrote:

> Nope, because scalaBuildVersion is the scala version including minor
> version so in this case: 2.12.10 and we used it just where we need.
> We used scalaVersion to specify for each library what scala is used, so
> used flink will be flink-streaming-scala_2.12
>
> Alex
>
> On Thu, Dec 19, 2019 at 3:40 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I see a mismatch between scalaBuildVersion and scalaVersion could this
>     be the issue?
>
>     Regards,
>     Timo
>
>
>     On 19.12.19 14:33, Alexandru Vasiu wrote:
>      > This is a part of my Gradle config:
>      >
>      > ext {
>      >      scalaVersion = '2.12'
>      >      flinkVersion = '1.9.1'
>      >      scalaBuildVersion = "${scalaVersion}.10"
>      >      scalaMockVersion = '4.4.0'
>      >      circeGenericVersion = '0.12.3'
>      >      circeExtrasVersion = '0.12.2'
>      >      pardiseVersion = '2.1.1'
>      >      slf4jVersion = '1.7.7'
>      >      log4jVersion = '1.2.17'
>      >      sourceDir = 'src/main/scala'
>      >      testDir = 'src/test/scala'
>      > }
>      > repositories {
>      >      mavenCentral()
>      >      //maven { url
>      > "https://repository.apache.org/content/repositories/snapshots/" }
>      > }
>      > configurations {
>      >      scalaCompilerPlugin
>      > }
>      > dependencies {
>      >      implementation
>     "org.scala-lang:scala-library:${scalaBuildVersion}"
>      >      //
>     --------------------------------------------------------------
>      >      // Compile-time dependencies that should NOT be part of the
>      >      // shadow jar and are provided in the lib folder of Flink
>      >      //
>     --------------------------------------------------------------
>      >      //compile "org.apache.flink:flink-java:${flinkVersion}"
>      >      implementation
>      >
>     "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
>      >      implementation
>      >
>     "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
>      >      //
>     --------------------------------------------------------------
>      >      // Dependencies that should be part of the shadow jar, e.g.
>      >      // connectors. These must be in the flinkShadowJar
>     configuration!
>      >      //
>     --------------------------------------------------------------
>      >      //flinkShadowJar
>      >
>     "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
>      >      // https://mvnrepository.com/artifact/io.circe/
>      >      implementation
>      > "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
>      >      implementation
>      > "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
>      >      implementation
>      > "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
>      >      // https://mvnrepository.com/artifact/org.scalamacros/paradise
>      >      scalaCompilerPlugin
>      > "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
>      >      implementation "log4j:log4j:${log4jVersion}"
>      >      implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>      >      // Add test dependencies here.
>      >      //testImplementation "junit:junit:4.12"
>      >      testImplementation
>     "org.scalatest:scalatest_${scalaVersion}:3.1.0"
>      >      // https://mvnrepository.com/artifact/org.scalamock/scalamock
>      >      testImplementation
>      > "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
>      > }
>      >
>      > So all are with the same scala version. I cannot share the code,
>     but the
>      > main app looks like:
>      >
>      > val env = StreamExecutionEnvironment.getExecutionEnvironment
>      > val stream  = env
>      >          .addSource(KAFKA_STREAM) // this will get us a stream
>     with our
>      > object model which is like this: case class A(a:Map[String,
>      > other_case_class_obj], b: List[other_case_class_obj], c: String)
>      > .flatMap(CustomFlatMap())
>      > .print
>      >
>      > Thank you,
>      > Alex
>      >
>      > On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <[hidden email]
>     <mailto:[hidden email]>
>      > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >     That's sounds like a classloading or most likely dependency
>     issue.
>      >
>      >     Are all dependencies including Flink use the same Scala
>     version? Could
>      >     you maybe share reproducible some code with us?
>      >
>      >     Regards,
>      >     Timo
>      >
>      >
>      >     On 19.12.19 13:53, Alexandru Vasiu wrote:
>      >      > I'm sorry for my last message, it might be incomplete.
>      >      >
>      >      > So I used case classed for my objects, but it doesn't work.
>      >      >
>      >      > Riching this error: "Exception in thread "main"
>      >      >
>      >  
>       org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
>      >
>      >      > java.lang.NoClassDefFoundError:
>     scala/math/Ordering$$anon$9" when
>      >     I'm
>      >      > trying to apply the map/flatMap function over the stream
>     (which
>      >     is from
>      >      > a Kafka consumer).
>      >      >
>      >      >
>      >      > Alex
>      >      >
>      >      > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
>      >      > <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>> wrote:
>      >      >
>      >      >     I used `case class` for example case class A(a:
>     Map[String,
>      >     String])
>      >      >     so it should work
>      >      >
>      >      >     Alex
>      >      >
>      >      >     On Thu, Dec 19, 2019 at 2:18 PM Timo Walther
>      >     <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>
>      >      >     <mailto:[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>>> wrote:
>      >      >
>      >      >         Hi Alex,
>      >      >
>      >      >         the problem is that `case class` classes are
>     analyzed by
>      >     Scala
>      >      >         specific
>      >      >         code whereas `class` classes are analyzed with
>     Java specific
>      >      >         code. So I
>      >      >         would recommend to use a case class to make sure
>     you stay
>      >     in the
>      >      >         "Scala
>      >      >         world" otherwise the fallback is the Java-based
>      >     TypeExtractor.
>      >      >
>      >      >         For your custom Map, you can simply ignore this error
>      >     message.
>      >      >         It will
>      >      >         fallback to the Java-based TypeExtractor and treat
>     it as a
>      >      >         generic type
>      >      >         because it is not a POJO.
>      >      >
>      >      >         I hope this helps.
>      >      >
>      >      >         Regards,
>      >      >         Timo
>      >      >
>      >      >
>      >      >         On 19.12.19 12:41, Alexandru Vasiu wrote:
>      >      >          > Hi,
>      >      >          >
>      >      >          > I use flink-scala version 1.9.1 and scala
>     2.12.10, and I
>      >      >         defined a data
>      >      >          > type which is a bit more complex: it has a list
>     in it
>      >     and even a
>      >      >          > dictionary. When I try to use a custom map I
>     got this
>      >     error:
>      >      >          >
>      >      >          > INFO
>       org.apache.flink.api.java.typeutils.TypeExtractor -
>      >      >         class A  does
>      >      >          > not contain a setter for field fields
>      >      >          > INFO
>       org.apache.flink.api.java.typeutils.TypeExtractor -
>      >      >         class A cannot
>      >      >          > be used as a POJO type because not all fields are
>      >     valid POJO
>      >      >         fields, and
>      >      >          > must be processed as GenericType. Please read
>     the Flink
>      >      >         documentation on
>      >      >          > "Data Types & Serialization" for details of the
>     effect on
>      >      >         performance.
>      >      >          >
>      >      >          > Is there a fix for this? Or a workaround?
>      >      >          >
>      >      >          > Thank you,
>      >      >          > Alex
>      >      >
>      >
>

Reply | Threaded
Open this post in threaded view
|

Re: POJO ERROR

Alexandru Vasiu
Hi,

We fixed it by converting the case class to a class.

Thank you,
Alex

On Thu, Dec 19, 2019 at 5:43 PM Timo Walther <[hidden email]> wrote:
Sorry, you are right. Maybe you can also share the full stack trace
because I don't know where this guava library should be used.

Regards,
Timo


On 19.12.19 14:50, Alexandru Vasiu wrote:
> Nope, because scalaBuildVersion is the scala version including minor
> version so in this case: 2.12.10 and we used it just where we need.
> We used scalaVersion to specify for each library what scala is used, so
> used flink will be flink-streaming-scala_2.12
>
> Alex
>
> On Thu, Dec 19, 2019 at 3:40 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I see a mismatch between scalaBuildVersion and scalaVersion could this
>     be the issue?
>
>     Regards,
>     Timo
>
>
>     On 19.12.19 14:33, Alexandru Vasiu wrote:
>      > This is a part of my Gradle config:
>      >
>      > ext {
>      >      scalaVersion = '2.12'
>      >      flinkVersion = '1.9.1'
>      >      scalaBuildVersion = "${scalaVersion}.10"
>      >      scalaMockVersion = '4.4.0'
>      >      circeGenericVersion = '0.12.3'
>      >      circeExtrasVersion = '0.12.2'
>      >      pardiseVersion = '2.1.1'
>      >      slf4jVersion = '1.7.7'
>      >      log4jVersion = '1.2.17'
>      >      sourceDir = 'src/main/scala'
>      >      testDir = 'src/test/scala'
>      > }
>      > repositories {
>      >      mavenCentral()
>      >      //maven { url
>      > "https://repository.apache.org/content/repositories/snapshots/" }
>      > }
>      > configurations {
>      >      scalaCompilerPlugin
>      > }
>      > dependencies {
>      >      implementation
>     "org.scala-lang:scala-library:${scalaBuildVersion}"
>      >      //
>     --------------------------------------------------------------
>      >      // Compile-time dependencies that should NOT be part of the
>      >      // shadow jar and are provided in the lib folder of Flink
>      >      //
>     --------------------------------------------------------------
>      >      //compile "org.apache.flink:flink-java:${flinkVersion}"
>      >      implementation
>      >
>     "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
>      >      implementation
>      >
>     "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
>      >      //
>     --------------------------------------------------------------
>      >      // Dependencies that should be part of the shadow jar, e.g.
>      >      // connectors. These must be in the flinkShadowJar
>     configuration!
>      >      //
>     --------------------------------------------------------------
>      >      //flinkShadowJar
>      >
>     "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
>      >      // https://mvnrepository.com/artifact/io.circe/
>      >      implementation
>      > "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
>      >      implementation
>      > "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
>      >      implementation
>      > "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
>      >      // https://mvnrepository.com/artifact/org.scalamacros/paradise
>      >      scalaCompilerPlugin
>      > "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
>      >      implementation "log4j:log4j:${log4jVersion}"
>      >      implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>      >      // Add test dependencies here.
>      >      //testImplementation "junit:junit:4.12"
>      >      testImplementation
>     "org.scalatest:scalatest_${scalaVersion}:3.1.0"
>      >      // https://mvnrepository.com/artifact/org.scalamock/scalamock
>      >      testImplementation
>      > "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
>      > }
>      >
>      > So all are with the same scala version. I cannot share the code,
>     but the
>      > main app looks like:
>      >
>      > val env = StreamExecutionEnvironment.getExecutionEnvironment
>      > val stream  = env
>      >          .addSource(KAFKA_STREAM) // this will get us a stream
>     with our
>      > object model which is like this: case class A(a:Map[String,
>      > other_case_class_obj], b: List[other_case_class_obj], c: String)
>      > .flatMap(CustomFlatMap())
>      > .print
>      >
>      > Thank you,
>      > Alex
>      >
>      > On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <[hidden email]
>     <mailto:[hidden email]>
>      > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >     That's sounds like a classloading or most likely dependency
>     issue.
>      >
>      >     Are all dependencies including Flink use the same Scala
>     version? Could
>      >     you maybe share reproducible some code with us?
>      >
>      >     Regards,
>      >     Timo
>      >
>      >
>      >     On 19.12.19 13:53, Alexandru Vasiu wrote:
>      >      > I'm sorry for my last message, it might be incomplete.
>      >      >
>      >      > So I used case classed for my objects, but it doesn't work.
>      >      >
>      >      > Riching this error: "Exception in thread "main"
>      >      >
>      >   
>       org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
>      >
>      >      > java.lang.NoClassDefFoundError:
>     scala/math/Ordering$$anon$9" when
>      >     I'm
>      >      > trying to apply the map/flatMap function over the stream
>     (which
>      >     is from
>      >      > a Kafka consumer).
>      >      >
>      >      >
>      >      > Alex
>      >      >
>      >      > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
>      >      > <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>> wrote:
>      >      >
>      >      >     I used `case class` for example case class A(a:
>     Map[String,
>      >     String])
>      >      >     so it should work
>      >      >
>      >      >     Alex
>      >      >
>      >      >     On Thu, Dec 19, 2019 at 2:18 PM Timo Walther
>      >     <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>
>      >      >     <mailto:[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>>> wrote:
>      >      >
>      >      >         Hi Alex,
>      >      >
>      >      >         the problem is that `case class` classes are
>     analyzed by
>      >     Scala
>      >      >         specific
>      >      >         code whereas `class` classes are analyzed with
>     Java specific
>      >      >         code. So I
>      >      >         would recommend to use a case class to make sure
>     you stay
>      >     in the
>      >      >         "Scala
>      >      >         world" otherwise the fallback is the Java-based
>      >     TypeExtractor.
>      >      >
>      >      >         For your custom Map, you can simply ignore this error
>      >     message.
>      >      >         It will
>      >      >         fallback to the Java-based TypeExtractor and treat
>     it as a
>      >      >         generic type
>      >      >         because it is not a POJO.
>      >      >
>      >      >         I hope this helps.
>      >      >
>      >      >         Regards,
>      >      >         Timo
>      >      >
>      >      >
>      >      >         On 19.12.19 12:41, Alexandru Vasiu wrote:
>      >      >          > Hi,
>      >      >          >
>      >      >          > I use flink-scala version 1.9.1 and scala
>     2.12.10, and I
>      >      >         defined a data
>      >      >          > type which is a bit more complex: it has a list
>     in it
>      >     and even a
>      >      >          > dictionary. When I try to use a custom map I
>     got this
>      >     error:
>      >      >          >
>      >      >          > INFO
>       org.apache.flink.api.java.typeutils.TypeExtractor -
>      >      >         class A  does
>      >      >          > not contain a setter for field fields
>      >      >          > INFO
>       org.apache.flink.api.java.typeutils.TypeExtractor -
>      >      >         class A cannot
>      >      >          > be used as a POJO type because not all fields are
>      >     valid POJO
>      >      >         fields, and
>      >      >          > must be processed as GenericType. Please read
>     the Flink
>      >      >         documentation on
>      >      >          > "Data Types & Serialization" for details of the
>     effect on
>      >      >         performance.
>      >      >          >
>      >      >          > Is there a fix for this? Or a workaround?
>      >      >          >
>      >      >          > Thank you,
>      >      >          > Alex
>      >      >
>      >
>