Hello, flink experts and friends! It is my first time to write flink application in my company. But I met the following error when I used a elasticsearch as my sink. I searched the solution for it and found a jira https://issues.apache.org/jira/browse/FLINK-7133 . then, I added the PR to my code like the following. But when I ran the flink program again, error is still there. Why ? When I used a filesystem as a sink ,nothing error happened, but when i returned to elasticsearch, error loves me. bad! Could you help me please ? I think the two lines of code of Log is not relevant to the error. But I still put it here as a reference. It is just a PB object. 21:41:09,397 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityInfo does not contain a setter for field unknownFields 21:41:09,400 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityInfo is not a valid POJO type because not all fields are valid POJO fields. Exception in thread "main" java.lang.IllegalArgumentException at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown Source) at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown Source) at org.apache.flink.api.scala.InnerClosureFinder.<init>(ClosureCleaner.scala:279) at org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600) at com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter$.main(WhileListFilter.scala:79) at com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter.main(WhileListFilter.scala) PR here: <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>${elasticsearch.version}</version> <exclusions> <exclusion> <groupId>org.ow2.asm</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> Regards mingleizhang
|
BTW, My elastic search version is 2.3.3, not the jira FLINK-7133 by 1.7.1. And I found 2.3.3 is not based on asm. My flink version is 1.3.1. flink-connector-elasticsearch-base_2.10 version is 1.3.1 flink-connector-elasticsearch2_2.10 version is 1.3.1 also. At 2017-08-13 21:54:06, "mingleizhang" <[hidden email]> wrote:
|
Just to be sure, can you try flink 1.3.2 which is supposed to fix FLINK-7133 and
was released recently? Nico On Monday, 14 August 2017 03:19:06 CEST mingleizhang wrote: > BTW, My elastic search version is 2.3.3, not the jira FLINK-7133 by 1.7.1. > And I found 2.3.3 is not based on asm. My flink version is 1.3.1. > > > flink-connector-elasticsearch-base_2.10 version is 1.3.1 > flink-connector-elasticsearch2_2.10 version is 1.3.1 also. > > > > > > > At 2017-08-13 21:54:06, "mingleizhang" <[hidden email]> wrote: > > Hello, flink experts and friends! > > > It is my first time to write flink application in my company. But I met the > following error when I used a elasticsearch as my sink. I searched the > solution for it and found a jira > https://issues.apache.org/jira/browse/FLINK-7133 . then, I added the PR to > my code like the following. But when I ran the flink program again, error > is still there. Why ? When I used a filesystem as a sink ,nothing error > happened, but when i returned to elasticsearch, error loves me. bad! Could > you help me please ? > > > I think the two lines of code of Log is not relevant to the error. But I > still put it here as a reference. It is just a PB object. > > > 21:41:09,397 INFO org.apache.flink.api.java.typeutils.TypeExtractor > - class > com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityI > nfo does not contain a setter for field unknownFields 21:41:09,400 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityI > nfo is not a valid POJO type because not all fields are valid POJO fields. > > > Exception in thread "main" java.lang.IllegalArgumentException > at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown > Source) at > org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown > Source) at > org.apache.flink.api.scala.InnerClosureFinder.<init>(ClosureCleaner.scala:2 > 79) at > org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.s > cala:95) at > org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean( > StreamExecutionEnvironment.scala:670) at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(S > treamExecutionEnvironment.scala:600) at > com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter$.main(W > hileListFilter.scala:79) at > com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter.main(Wh > ileListFilter.scala) > > > PR here: > <dependency> > <groupId>org.elasticsearch</groupId> > <artifactId>elasticsearch</artifactId> > <version>${elasticsearch.version}</version> > <exclusions> > <exclusion> > <groupId>org.ow2.asm</groupId> > <artifactId>*</artifactId> > </exclusion> > </exclusions> > </dependency> > > > Regards > mingleizhang > [hidden email] signature.asc (201 bytes) Download Attachment |
Thanks, Nico. I tried flink1.3.2. Works now. Thank you very much! I think there should be something else to cause this error to happen. Not only the PR I patched before. Thanks. mingleizhang
|
I would like to ask what is “PB object”?
Thanks. Hai Zhou
|
A PB object is an object that is build from Protobuf. It belongs to google's data interchange format. You can get more from https://github.com/google/protobuf At 2017-08-15 13:42:58, "Hai Zhou" <[hidden email]> wrote: I would like to ask what is “PB object”?
|
Free forum by Nabble | Edit this page |