IllegalArgumentException when using elasticsearch as a sink

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

IllegalArgumentException when using elasticsearch as a sink

zhangminglei
Hello, flink experts and friends!

It is my first time to write flink application in my company. But I met the following error when I used a elasticsearch as my sink. I searched the solution for it and found a jira https://issues.apache.org/jira/browse/FLINK-7133 . then, I added the PR to my code like the following. But when I ran the flink program again, error is still there. Why ? When I used a filesystem as a sink ,nothing error happened, but when i returned to elasticsearch, error loves me. bad!  Could you help me please ? 

I think the two lines of code of Log is not relevant to the error. But I still put it here as a reference. It is just a PB object. 

21:41:09,397 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityInfo does not contain a setter for field unknownFields
21:41:09,400 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityInfo is not a valid POJO type because not all fields are valid POJO fields.

Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown Source)
at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown Source)
at org.apache.flink.api.scala.InnerClosureFinder.<init>(ClosureCleaner.scala:279)
at org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600)
at com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter$.main(WhileListFilter.scala:79)
at com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter.main(WhileListFilter.scala)

PR here:
<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch</artifactId>
  <version>${elasticsearch.version}</version>
  <exclusions>
    <exclusion>
      <groupId>org.ow2.asm</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Regards
mingleizhang






 

Reply | Threaded
Open this post in threaded view
|

Re:IllegalArgumentException when using elasticsearch as a sink

zhangminglei
BTW, My elastic search version is 2.3.3, not the jira FLINK-7133 by 1.7.1. And I found 2.3.3 is not based on asm. My flink version is 1.3.1. 

flink-connector-elasticsearch-base_2.10 version is 1.3.1
flink-connector-elasticsearch2_2.10 version is 1.3.1 also.





At 2017-08-13 21:54:06, "mingleizhang" <[hidden email]> wrote:
Hello, flink experts and friends!

It is my first time to write flink application in my company. But I met the following error when I used a elasticsearch as my sink. I searched the solution for it and found a jira https://issues.apache.org/jira/browse/FLINK-7133 . then, I added the PR to my code like the following. But when I ran the flink program again, error is still there. Why ? When I used a filesystem as a sink ,nothing error happened, but when i returned to elasticsearch, error loves me. bad!  Could you help me please ? 

I think the two lines of code of Log is not relevant to the error. But I still put it here as a reference. It is just a PB object. 

21:41:09,397 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityInfo does not contain a setter for field unknownFields
21:41:09,400 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityInfo is not a valid POJO type because not all fields are valid POJO fields.

Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown Source)
at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown Source)
at org.apache.flink.api.scala.InnerClosureFinder.<init>(ClosureCleaner.scala:279)
at org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600)
at com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter$.main(WhileListFilter.scala:79)
at com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter.main(WhileListFilter.scala)

PR here:
<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch</artifactId>
  <version>${elasticsearch.version}</version>
  <exclusions>
    <exclusion>
      <groupId>org.ow2.asm</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Regards
mingleizhang






 



Reply | Threaded
Open this post in threaded view
|

Re: IllegalArgumentException when using elasticsearch as a sink

Nico Kruber
Just to be sure, can you try flink 1.3.2 which is supposed to fix FLINK-7133 and
was released recently?

Nico

On Monday, 14 August 2017 03:19:06 CEST mingleizhang wrote:

> BTW, My elastic search version is 2.3.3, not the jira FLINK-7133 by 1.7.1.
> And I found 2.3.3 is not based on asm. My flink version is 1.3.1.
>
>
> flink-connector-elasticsearch-base_2.10 version is 1.3.1
> flink-connector-elasticsearch2_2.10 version is 1.3.1 also.
>
>
>
>
>
>
> At 2017-08-13 21:54:06, "mingleizhang" <[hidden email]> wrote:
>
> Hello, flink experts and friends!
>
>
> It is my first time to write flink application in my company. But I met the
> following error when I used a elasticsearch as my sink. I searched the
> solution for it and found a jira
> https://issues.apache.org/jira/browse/FLINK-7133 . then, I added the PR to
> my code like the following. But when I ran the flink program again, error
> is still there. Why ? When I used a filesystem as a sink ,nothing error
> happened, but when i returned to elasticsearch, error loves me. bad!  Could
> you help me please ?
>
>
> I think the two lines of code of Log is not relevant to the error. But I
> still put it here as a reference. It is just a PB object.
>
>
> 21:41:09,397 INFO  org.apache.flink.api.java.typeutils.TypeExtractor        
>     - class
> com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityI
> nfo does not contain a setter for field unknownFields 21:41:09,400 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - class
> com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityI
> nfo is not a valid POJO type because not all fields are valid POJO fields.
>
>
> Exception in thread "main" java.lang.IllegalArgumentException
> at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown
> Source) at
> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown
> Source) at
> org.apache.flink.api.scala.InnerClosureFinder.<init>(ClosureCleaner.scala:2
> 79) at
> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.s
> cala:95) at
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(
> StreamExecutionEnvironment.scala:670) at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(S
> treamExecutionEnvironment.scala:600) at
> com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter$.main(W
> hileListFilter.scala:79) at
> com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter.main(Wh
> ileListFilter.scala)
>
>
> PR here:
> <dependency>
>   <groupId>org.elasticsearch</groupId>
>   <artifactId>elasticsearch</artifactId>
>   <version>${elasticsearch.version}</version>
>   <exclusions>
>     <exclusion>
>       <groupId>org.ow2.asm</groupId>
>       <artifactId>*</artifactId>
>     </exclusion>
>   </exclusions>
> </dependency>
>
>
> Regards
> mingleizhang
> [hidden email]


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re:Re: IllegalArgumentException when using elasticsearch as a sink

zhangminglei
Thanks, Nico. I tried flink1.3.2. Works now. Thank you very much! I think there should be something else to cause this error to happen. Not only the PR I patched before.

Thanks.
mingleizhang






At 2017-08-15 00:29:28, "Nico Kruber" <[hidden email]> wrote: >Just to be sure, can you try flink 1.3.2 which is supposed to fix FLINK-7133 and >was released recently? > >Nico > >On Monday, 14 August 2017 03:19:06 CEST mingleizhang wrote: >> BTW, My elastic search version is 2.3.3, not the jira FLINK-7133 by 1.7.1. >> And I found 2.3.3 is not based on asm. My flink version is 1.3.1. >> >> >> flink-connector-elasticsearch-base_2.10 version is 1.3.1 >> flink-connector-elasticsearch2_2.10 version is 1.3.1 also. >> >> >> >> >> >> >> At 2017-08-13 21:54:06, "mingleizhang" <[hidden email]> wrote: >> >> Hello, flink experts and friends! >> >> >> It is my first time to write flink application in my company. But I met the >> following error when I used a elasticsearch as my sink. I searched the >> solution for it and found a jira >> https://issues.apache.org/jira/browse/FLINK-7133 . then, I added the PR to >> my code like the following. But when I ran the flink program again, error >> is still there. Why ? When I used a filesystem as a sink ,nothing error >> happened, but when i returned to elasticsearch, error loves me. bad! Could >> you help me please ? >> >> >> I think the two lines of code of Log is not relevant to the error. But I >> still put it here as a reference. It is just a PB object. >> >> >> 21:41:09,397 INFO org.apache.flink.api.java.typeutils.TypeExtractor >> - class >> com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityI >> nfo does not contain a setter for field unknownFields 21:41:09,400 INFO >> org.apache.flink.api.java.typeutils.TypeExtractor - class >> com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityI >> nfo is not a valid POJO type because not all fields are valid POJO fields. >> >> >> Exception in thread "main" java.lang.IllegalArgumentException >> at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown >> Source) at >> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown >> Source) at >> org.apache.flink.api.scala.InnerClosureFinder.<init>(ClosureCleaner.scala:2 >> 79) at >> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.s >> cala:95) at >> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean( >> StreamExecutionEnvironment.scala:670) at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(S >> treamExecutionEnvironment.scala:600) at >> com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter$.main(W >> hileListFilter.scala:79) at >> com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter.main(Wh >> ileListFilter.scala) >> >> >> PR here: >> <dependency> >> <groupId>org.elasticsearch</groupId> >> <artifactId>elasticsearch</artifactId> >> <version>${elasticsearch.version}</version> >> <exclusions> >> <exclusion> >> <groupId>org.ow2.asm</groupId> >> <artifactId>*</artifactId> >> </exclusion> >> </exclusions> >> </dependency> >> >> >> Regards >> mingleizhang >> [hidden email] >



 

Reply | Threaded
Open this post in threaded view
|

Re: IllegalArgumentException when using elasticsearch as a sink

Hai Zhou
I would like to ask what is “PB object”?

Thanks.
Hai Zhou

在 2017年8月15日,09:53,mingleizhang <[hidden email]> 写道:

Thanks, Nico. I tried flink1.3.2. Works now. Thank you very much! I think there should be something else to cause this error to happen. Not only the PR I patched before.

Thanks.
mingleizhang






At 2017-08-15 00:29:28, "Nico Kruber" <[hidden email]> wrote: >Just to be sure, can you try flink 1.3.2 which is supposed to fix FLINK-7133 and >was released recently? > >Nico > >On Monday, 14 August 2017 03:19:06 CEST mingleizhang wrote: >> BTW, My elastic search version is 2.3.3, not the jira FLINK-7133 by 1.7.1. >> And I found 2.3.3 is not based on asm. My flink version is 1.3.1. >> >> >> flink-connector-elasticsearch-base_2.10 version is 1.3.1 >> flink-connector-elasticsearch2_2.10 version is 1.3.1 also. >> >> >> >> >> >> >> At 2017-08-13 21:54:06, "mingleizhang" <[hidden email]> wrote: >> >> Hello, flink experts and friends! >> >> >> It is my first time to write flink application in my company. But I met the >> following error when I used a elasticsearch as my sink. I searched the >> solution for it and found a jira >> https://issues.apache.org/jira/browse/FLINK-7133 . then, I added the PR to >> my code like the following. But when I ran the flink program again, error >> is still there. Why ? When I used a filesystem as a sink ,nothing error >> happened, but when i returned to elasticsearch, error loves me. bad! Could >> you help me please ? >> >> >> I think the two lines of code of Log is not relevant to the error. But I >> still put it here as a reference. It is just a PB object. >> >> >> 21:41:09,397 INFO org.apache.flink.api.java.typeutils.TypeExtractor >> - class >> com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityI >> nfo does not contain a setter for field unknownFields 21:41:09,400 INFO >> org.apache.flink.api.java.typeutils.TypeExtractor - class >> com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityI >> nfo is not a valid POJO type because not all fields are valid POJO fields. >> >> >> Exception in thread "main" java.lang.IllegalArgumentException >> at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown >> Source) at >> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown >> Source) at >> org.apache.flink.api.scala.InnerClosureFinder.<init>(ClosureCleaner.scala:2 >> 79) at >> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.s >> cala:95) at >> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean( >> StreamExecutionEnvironment.scala:670) at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(S >> treamExecutionEnvironment.scala:600) at >> com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter$.main(W >> hileListFilter.scala:79) at >> com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter.main(Wh >> ileListFilter.scala) >> >> >> PR here: >> <dependency> >> <groupId>org.elasticsearch</groupId> >> <artifactId>elasticsearch</artifactId> >> <version>${elasticsearch.version}</version> >> <exclusions> >> <exclusion> >> <groupId>org.ow2.asm</groupId> >> <artifactId>*</artifactId> >> </exclusion> >> </exclusions> >> </dependency> >> >> >> Regards >> mingleizhang >> [hidden email] >



 


Reply | Threaded
Open this post in threaded view
|

Re:Re: IllegalArgumentException when using elasticsearch as a sink

zhangminglei

A PB object is an object that is build from Protobuf. It belongs to google's data interchange format. You can get more from https://github.com/google/protobuf



At 2017-08-15 13:42:58, "Hai Zhou" <[hidden email]> wrote:
I would like to ask what is “PB object”?

Thanks.
Hai Zhou

在 2017年8月15日,09:53,mingleizhang <[hidden email]> 写道:

Thanks, Nico. I tried flink1.3.2. Works now. Thank you very much! I think there should be something else to cause this error to happen. Not only the PR I patched before.

Thanks.
mingleizhang






At 2017-08-15 00:29:28, "Nico Kruber" <[hidden email]> wrote: >Just to be sure, can you try flink 1.3.2 which is supposed to fix FLINK-7133 and >was released recently? > >Nico > >On Monday, 14 August 2017 03:19:06 CEST mingleizhang wrote: >> BTW, My elastic search version is 2.3.3, not the jira FLINK-7133 by 1.7.1. >> And I found 2.3.3 is not based on asm. My flink version is 1.3.1. >> >> >> flink-connector-elasticsearch-base_2.10 version is 1.3.1 >> flink-connector-elasticsearch2_2.10 version is 1.3.1 also. >> >> >> >> >> >> >> At 2017-08-13 21:54:06, "mingleizhang" <[hidden email]> wrote: >> >> Hello, flink experts and friends! >> >> >> It is my first time to write flink application in my company. But I met the >> following error when I used a elasticsearch as my sink. I searched the >> solution for it and found a jira >> https://issues.apache.org/jira/browse/FLINK-7133 . then, I added the PR to >> my code like the following. But when I ran the flink program again, error >> is still there. Why ? When I used a filesystem as a sink ,nothing error >> happened, but when i returned to elasticsearch, error loves me. bad! Could >> you help me please ? >> >> >> I think the two lines of code of Log is not relevant to the error. But I >> still put it here as a reference. It is just a PB object. >> >> >> 21:41:09,397 INFO org.apache.flink.api.java.typeutils.TypeExtractor >> - class >> com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityI >> nfo does not contain a setter for field unknownFields 21:41:09,400 INFO >> org.apache.flink.api.java.typeutils.TypeExtractor - class >> com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityI >> nfo is not a valid POJO type because not all fields are valid POJO fields. >> >> >> Exception in thread "main" java.lang.IllegalArgumentException >> at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown >> Source) at >> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown >> Source) at >> org.apache.flink.api.scala.InnerClosureFinder.<init>(ClosureCleaner.scala:2 >> 79) at >> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.s >> cala:95) at >> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean( >> StreamExecutionEnvironment.scala:670) at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(S >> treamExecutionEnvironment.scala:600) at >> com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter$.main(W >> hileListFilter.scala:79) at >> com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter.main(Wh >> ileListFilter.scala) >> >> >> PR here: >> <dependency> >> <groupId>org.elasticsearch</groupId> >> <artifactId>elasticsearch</artifactId> >> <version>${elasticsearch.version}</version> >> <exclusions> >> <exclusion> >> <groupId>org.ow2.asm</groupId> >> <artifactId>*</artifactId> >> </exclusion> >> </exclusions> >> </dependency> >> >> >> Regards >> mingleizhang >> [hidden email] >