Flink Java 8 problem (no lambda, simple code)

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Java 8 problem (no lambda, simple code)

LINZ, Arnaud

Hi,

 

I have the following simple code that works well in Java 7 :

 

        final ExecutionEnvironment cluster = ExecutionEnvironment.createLocalEnvironment();

        final DataSet<String> textFile = cluster.readTextFile(MiscTools.chercher("jeuDeDonnees.txt"));

        final DataSet<Tuple2<String, Integer>> words = textFile

            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                @Override

                public void flatMap(String ligne, Collector<Tuple2<String, Integer>> out) {

                    for (final String word : ligne.split("\\s")) {

                        out.collect(new Tuple2<String, Integer>(word, 1));

                    }

                }

            });

        final DataSet<Tuple2<String, Integer>> wordsCount = words.groupBy(0).sum(1);

        wordsCount.print();

        cluster.execute("testFlink");

 

When compiled in Java 8 and executed (Oracle JDK or Eclipse JDT compiler, same result) I have the following stack trace (under eclipse or with maven test) :

 

java.lang.IllegalArgumentException: null

       at org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown Source)

       at org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown Source)

       at org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown Source)

       at org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:40)

       at org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:67)

       at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:54)

       at org.apache.flink.api.java.DataSet.clean(DataSet.java:185)

       at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)

 

 

Any idea why ?

 

Here is my pom.xml :

 

       <build>

             <plugins>

                    <!-- Surefire plugin -->

                    <plugin>

                           <groupId>org.apache.maven.plugins</groupId>

                           <artifactId>maven-surefire-plugin</artifactId>

                           <version>2.15</version>

                           <configuration>

                                  <jvm>${env.JAVA_HOME}/bin/java</jvm>

                                  <forkMode>once</forkMode>

                           </configuration>

                    </plugin>

 

                   

                    <plugin>

                           <groupId>org.apache.maven.plugins</groupId>

                           <artifactId>maven-compiler-plugin</artifactId>

                           <version>3.1</version>

 

                           <configuration>

                                  <source>1.8</source>

                                  <target>1.8</target>

                                  <compilerId>jdt</compilerId>

                                  <!-- executable>${env.JAVA_HOME}/bin/javac</executable> -->

                           </configuration>

                           <dependencies>

                                  <!-- This dependency provides the implementation of compiler "jdt": -->

                                  <dependency>

                                        <groupId>org.eclipse.tycho</groupId>

                                        <artifactId>tycho-compiler-jdt</artifactId>

                                        <version>0.21.0</version>

                                  </dependency>

                           </dependencies>

                    </plugin>

             </plugins>

             <pluginManagement>

                    <plugins>

                           <!--This plugin's configuration is used to store Eclipse m2e settings

                                  only. It has no influence on the Maven build itself. -->

                           <plugin>

                                  <groupId>org.eclipse.m2e</groupId>

                                  <artifactId>lifecycle-mapping</artifactId>

                                  <version>1.0.0</version>

                                  <configuration>

                                        <lifecycleMappingMetadata>

                                               <pluginExecutions>

                                                      <pluginExecution>

                                                            <pluginExecutionFilter>

                                                                   <groupId>

                                                                          org.apache.maven.plugins

                                                                   </groupId>

                                                                   <artifactId>

                                                                          maven-compiler-plugin

                                                                   </artifactId>

                                                                   <versionRange>

                                                                          [3.1,)

                                                                   </versionRange>

                                                                   <goals>

                                                                          <goal>compile</goal>

                                                                   </goals>

                                                            </pluginExecutionFilter>

                                                            <action>

                                                                   <ignore></ignore>

                                                            </action>

                                                      </pluginExecution>

                                               </pluginExecutions>

                                        </lifecycleMappingMetadata>

                                  </configuration>

                           </plugin>

                    </plugins>

             </pluginManagement>

       </build>

 

       <dependencies>

             <dependency>

                    <groupId>junit</groupId>

                    <artifactId>junit</artifactId>

                    <version>4.10</version>

                    <scope>test</scope>

             </dependency>

 

             <!-- Flink -->

             <dependency>

                    <groupId>org.apache.flink</groupId>

                    <artifactId>flink-java</artifactId>

                    <version>0.9.0-milestone-1</version>

             </dependency>

             <dependency>

                    <groupId>org.apache.flink</groupId>

                    <artifactId>flink-clients</artifactId>

                    <version>0.9.0-milestone-1</version>

             </dependency>

 

 

       </dependencies>

 

A simple replacement of “1.8” by “1.7” in the pom makes the program work.

 

Regards,

Arnaud




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Java 8 problem (no lambda, simple code)

Aljoscha Krettek
I'm looking into it,

On Fri, Apr 24, 2015 at 11:13 AM, LINZ, Arnaud <[hidden email]> wrote:

> Hi,
>
>
>
> I have the following simple code that works well in Java 7 :
>
>
>
>         final ExecutionEnvironment cluster =
> ExecutionEnvironment.createLocalEnvironment();
>
>         final DataSet<String> textFile =
> cluster.readTextFile(MiscTools.chercher("jeuDeDonnees.txt"));
>
>         final DataSet<Tuple2<String, Integer>> words = textFile
>
>             .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>()
> {
>
>                 @Override
>
>                 public void flatMap(String ligne, Collector<Tuple2<String,
> Integer>> out) {
>
>                     for (final String word : ligne.split("\\s")) {
>
>                         out.collect(new Tuple2<String, Integer>(word, 1));
>
>                     }
>
>                 }
>
>             });
>
>         final DataSet<Tuple2<String, Integer>> wordsCount =
> words.groupBy(0).sum(1);
>
>         wordsCount.print();
>
>         cluster.execute("testFlink");
>
>
>
> When compiled in Java 8 and executed (Oracle JDK or Eclipse JDT compiler,
> same result) I have the following stack trace (under eclipse or with maven
> test) :
>
>
>
> java.lang.IllegalArgumentException: null
>
>        at
> org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown Source)
>
>        at
> org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown Source)
>
>        at
> org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown Source)
>
>        at
> org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:40)
>
>        at
> org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:67)
>
>        at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:54)
>
>        at org.apache.flink.api.java.DataSet.clean(DataSet.java:185)
>
>        at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)
>
>
>
>
>
> Any idea why ?
>
>
>
> Here is my pom.xml :
>
>
>
>        <build>
>
>              <plugins>
>
>                     <!-- Surefire plugin -->
>
>                     <plugin>
>
>                            <groupId>org.apache.maven.plugins</groupId>
>
>                            <artifactId>maven-surefire-plugin</artifactId>
>
>                            <version>2.15</version>
>
>                            <configuration>
>
>                                   <jvm>${env.JAVA_HOME}/bin/java</jvm>
>
>                                   <forkMode>once</forkMode>
>
>                            </configuration>
>
>                     </plugin>
>
>
>
>
>
>                     <plugin>
>
>                            <groupId>org.apache.maven.plugins</groupId>
>
>                            <artifactId>maven-compiler-plugin</artifactId>
>
>                            <version>3.1</version>
>
>
>
>                            <configuration>
>
>                                   <source>1.8</source>
>
>                                   <target>1.8</target>
>
>                                   <compilerId>jdt</compilerId>
>
>                                   <!--
> executable>${env.JAVA_HOME}/bin/javac</executable> -->
>
>                            </configuration>
>
>                            <dependencies>
>
>                                   <!-- This dependency provides the
> implementation of compiler "jdt": -->
>
>                                   <dependency>
>
>                                         <groupId>org.eclipse.tycho</groupId>
>
>
> <artifactId>tycho-compiler-jdt</artifactId>
>
>                                         <version>0.21.0</version>
>
>                                   </dependency>
>
>                            </dependencies>
>
>                     </plugin>
>
>              </plugins>
>
>              <pluginManagement>
>
>                     <plugins>
>
>                            <!--This plugin's configuration is used to store
> Eclipse m2e settings
>
>                                   only. It has no influence on the Maven
> build itself. -->
>
>                            <plugin>
>
>                                   <groupId>org.eclipse.m2e</groupId>
>
>                                   <artifactId>lifecycle-mapping</artifactId>
>
>                                   <version>1.0.0</version>
>
>                                   <configuration>
>
>                                         <lifecycleMappingMetadata>
>
>                                                <pluginExecutions>
>
>                                                       <pluginExecution>
>
>
> <pluginExecutionFilter>
>
>                                                                    <groupId>
>
>
> org.apache.maven.plugins
>
>
> </groupId>
>
>
> <artifactId>
>
>
> maven-compiler-plugin
>
>
> </artifactId>
>
>
> <versionRange>
>
>
> [3.1,)
>
>
> </versionRange>
>
>                                                                    <goals>
>
>
> <goal>compile</goal>
>
>                                                                    </goals>
>
>
> </pluginExecutionFilter>
>
>                                                             <action>
>
>
> <ignore></ignore>
>
>                                                             </action>
>
>                                                       </pluginExecution>
>
>                                                </pluginExecutions>
>
>                                         </lifecycleMappingMetadata>
>
>                                   </configuration>
>
>                            </plugin>
>
>                     </plugins>
>
>              </pluginManagement>
>
>        </build>
>
>
>
>        <dependencies>
>
>              <dependency>
>
>                     <groupId>junit</groupId>
>
>                     <artifactId>junit</artifactId>
>
>                     <version>4.10</version>
>
>                     <scope>test</scope>
>
>              </dependency>
>
>
>
>              <!-- Flink -->
>
>              <dependency>
>
>                     <groupId>org.apache.flink</groupId>
>
>                     <artifactId>flink-java</artifactId>
>
>                     <version>0.9.0-milestone-1</version>
>
>              </dependency>
>
>              <dependency>
>
>                     <groupId>org.apache.flink</groupId>
>
>                     <artifactId>flink-clients</artifactId>
>
>                     <version>0.9.0-milestone-1</version>
>
>              </dependency>
>
>
>
>
>
>        </dependencies>
>
>
>
> A simple replacement of “1.8” by “1.7” in the pom makes the program work.
>
>
>
> Regards,
>
> Arnaud
>
>
> ________________________________
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous
> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir
> l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Java 8 problem (no lambda, simple code)

Stephan Ewen
One thing I noticed a while back with ASM version 4 and Java 8 had issues - but those were related to Java 8 lambdas.

Back then, bumping ASM to version 5 helped it. Not sure if this is the same problem, though, since you do not seem to use Java 8 lambdas...

On Fri, Apr 24, 2015 at 11:32 AM, Aljoscha Krettek <[hidden email]> wrote:
I'm looking into it,

On Fri, Apr 24, 2015 at 11:13 AM, LINZ, Arnaud <[hidden email]> wrote:
> Hi,
>
>
>
> I have the following simple code that works well in Java 7 :
>
>
>
>         final ExecutionEnvironment cluster =
> ExecutionEnvironment.createLocalEnvironment();
>
>         final DataSet<String> textFile =
> cluster.readTextFile(MiscTools.chercher("jeuDeDonnees.txt"));
>
>         final DataSet<Tuple2<String, Integer>> words = textFile
>
>             .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>()
> {
>
>                 @Override
>
>                 public void flatMap(String ligne, Collector<Tuple2<String,
> Integer>> out) {
>
>                     for (final String word : ligne.split("\\s")) {
>
>                         out.collect(new Tuple2<String, Integer>(word, 1));
>
>                     }
>
>                 }
>
>             });
>
>         final DataSet<Tuple2<String, Integer>> wordsCount =
> words.groupBy(0).sum(1);
>
>         wordsCount.print();
>
>         cluster.execute("testFlink");
>
>
>
> When compiled in Java 8 and executed (Oracle JDK or Eclipse JDT compiler,
> same result) I have the following stack trace (under eclipse or with maven
> test) :
>
>
>
> java.lang.IllegalArgumentException: null
>
>        at
> org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown Source)
>
>        at
> org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown Source)
>
>        at
> org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown Source)
>
>        at
> org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:40)
>
>        at
> org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:67)
>
>        at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:54)
>
>        at org.apache.flink.api.java.DataSet.clean(DataSet.java:185)
>
>        at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)
>
>
>
>
>
> Any idea why ?
>
>
>
> Here is my pom.xml :
>
>
>
>        <build>
>
>              <plugins>
>
>                     <!-- Surefire plugin -->
>
>                     <plugin>
>
>                            <groupId>org.apache.maven.plugins</groupId>
>
>                            <artifactId>maven-surefire-plugin</artifactId>
>
>                            <version>2.15</version>
>
>                            <configuration>
>
>                                   <jvm>${env.JAVA_HOME}/bin/java</jvm>
>
>                                   <forkMode>once</forkMode>
>
>                            </configuration>
>
>                     </plugin>
>
>
>
>
>
>                     <plugin>
>
>                            <groupId>org.apache.maven.plugins</groupId>
>
>                            <artifactId>maven-compiler-plugin</artifactId>
>
>                            <version>3.1</version>
>
>
>
>                            <configuration>
>
>                                   <source>1.8</source>
>
>                                   <target>1.8</target>
>
>                                   <compilerId>jdt</compilerId>
>
>                                   <!--
> executable>${env.JAVA_HOME}/bin/javac</executable> -->
>
>                            </configuration>
>
>                            <dependencies>
>
>                                   <!-- This dependency provides the
> implementation of compiler "jdt": -->
>
>                                   <dependency>
>
>                                         <groupId>org.eclipse.tycho</groupId>
>
>
> <artifactId>tycho-compiler-jdt</artifactId>
>
>                                         <version>0.21.0</version>
>
>                                   </dependency>
>
>                            </dependencies>
>
>                     </plugin>
>
>              </plugins>
>
>              <pluginManagement>
>
>                     <plugins>
>
>                            <!--This plugin's configuration is used to store
> Eclipse m2e settings
>
>                                   only. It has no influence on the Maven
> build itself. -->
>
>                            <plugin>
>
>                                   <groupId>org.eclipse.m2e</groupId>
>
>                                   <artifactId>lifecycle-mapping</artifactId>
>
>                                   <version>1.0.0</version>
>
>                                   <configuration>
>
>                                         <lifecycleMappingMetadata>
>
>                                                <pluginExecutions>
>
>                                                       <pluginExecution>
>
>
> <pluginExecutionFilter>
>
>                                                                    <groupId>
>
>
> org.apache.maven.plugins
>
>
> </groupId>
>
>
> <artifactId>
>
>
> maven-compiler-plugin
>
>
> </artifactId>
>
>
> <versionRange>
>
>
> [3.1,)
>
>
> </versionRange>
>
>                                                                    <goals>
>
>
> <goal>compile</goal>
>
>                                                                    </goals>
>
>
> </pluginExecutionFilter>
>
>                                                             <action>
>
>
> <ignore></ignore>
>
>                                                             </action>
>
>                                                       </pluginExecution>
>
>                                                </pluginExecutions>
>
>                                         </lifecycleMappingMetadata>
>
>                                   </configuration>
>
>                            </plugin>
>
>                     </plugins>
>
>              </pluginManagement>
>
>        </build>
>
>
>
>        <dependencies>
>
>              <dependency>
>
>                     <groupId>junit</groupId>
>
>                     <artifactId>junit</artifactId>
>
>                     <version>4.10</version>
>
>                     <scope>test</scope>
>
>              </dependency>
>
>
>
>              <!-- Flink -->
>
>              <dependency>
>
>                     <groupId>org.apache.flink</groupId>
>
>                     <artifactId>flink-java</artifactId>
>
>                     <version>0.9.0-milestone-1</version>
>
>              </dependency>
>
>              <dependency>
>
>                     <groupId>org.apache.flink</groupId>
>
>                     <artifactId>flink-clients</artifactId>
>
>                     <version>0.9.0-milestone-1</version>
>
>              </dependency>
>
>
>
>
>
>        </dependencies>
>
>
>
> A simple replacement of “1.8” by “1.7” in the pom makes the program work.
>
>
>
> Regards,
>
> Arnaud
>
>
> ________________________________
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous
> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir
> l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Java 8 problem (no lambda, simple code)

Aljoscha Krettek
Unfortunately I can't reproduce your error on my machine (OS X, java
8) i created a fresh maven project from your pom and source example
and it runs.

As a workaround you can call
cluster.getConfig().disableClosureCleaner(). The closure cleaner
normally cleans closures from unneeded stuff because we send them over
the network. Here, the flatMap anonymous function has a closure. Is
there any other code inside your main function that could cause this?

On Fri, Apr 24, 2015 at 11:34 AM, Stephan Ewen <[hidden email]> wrote:

> One thing I noticed a while back with ASM version 4 and Java 8 had issues -
> but those were related to Java 8 lambdas.
>
> Back then, bumping ASM to version 5 helped it. Not sure if this is the same
> problem, though, since you do not seem to use Java 8 lambdas...
>
> On Fri, Apr 24, 2015 at 11:32 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>>
>> I'm looking into it,
>>
>> On Fri, Apr 24, 2015 at 11:13 AM, LINZ, Arnaud <[hidden email]>
>> wrote:
>> > Hi,
>> >
>> >
>> >
>> > I have the following simple code that works well in Java 7 :
>> >
>> >
>> >
>> >         final ExecutionEnvironment cluster =
>> > ExecutionEnvironment.createLocalEnvironment();
>> >
>> >         final DataSet<String> textFile =
>> > cluster.readTextFile(MiscTools.chercher("jeuDeDonnees.txt"));
>> >
>> >         final DataSet<Tuple2<String, Integer>> words = textFile
>> >
>> >             .flatMap(new FlatMapFunction<String, Tuple2<String,
>> > Integer>>()
>> > {
>> >
>> >                 @Override
>> >
>> >                 public void flatMap(String ligne,
>> > Collector<Tuple2<String,
>> > Integer>> out) {
>> >
>> >                     for (final String word : ligne.split("\\s")) {
>> >
>> >                         out.collect(new Tuple2<String, Integer>(word,
>> > 1));
>> >
>> >                     }
>> >
>> >                 }
>> >
>> >             });
>> >
>> >         final DataSet<Tuple2<String, Integer>> wordsCount =
>> > words.groupBy(0).sum(1);
>> >
>> >         wordsCount.print();
>> >
>> >         cluster.execute("testFlink");
>> >
>> >
>> >
>> > When compiled in Java 8 and executed (Oracle JDK or Eclipse JDT
>> > compiler,
>> > same result) I have the following stack trace (under eclipse or with
>> > maven
>> > test) :
>> >
>> >
>> >
>> > java.lang.IllegalArgumentException: null
>> >
>> >        at
>> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown
>> > Source)
>> >
>> >        at
>> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown
>> > Source)
>> >
>> >        at
>> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknown
>> > Source)
>> >
>> >        at
>> >
>> > org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:40)
>> >
>> >        at
>> >
>> > org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:67)
>> >
>> >        at
>> > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:54)
>> >
>> >        at org.apache.flink.api.java.DataSet.clean(DataSet.java:185)
>> >
>> >        at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)
>> >
>> >
>> >
>> >
>> >
>> > Any idea why ?
>> >
>> >
>> >
>> > Here is my pom.xml :
>> >
>> >
>> >
>> >        <build>
>> >
>> >              <plugins>
>> >
>> >                     <!-- Surefire plugin -->
>> >
>> >                     <plugin>
>> >
>> >                            <groupId>org.apache.maven.plugins</groupId>
>> >
>> >
>> > <artifactId>maven-surefire-plugin</artifactId>
>> >
>> >                            <version>2.15</version>
>> >
>> >                            <configuration>
>> >
>> >                                   <jvm>${env.JAVA_HOME}/bin/java</jvm>
>> >
>> >                                   <forkMode>once</forkMode>
>> >
>> >                            </configuration>
>> >
>> >                     </plugin>
>> >
>> >
>> >
>> >
>> >
>> >                     <plugin>
>> >
>> >                            <groupId>org.apache.maven.plugins</groupId>
>> >
>> >
>> > <artifactId>maven-compiler-plugin</artifactId>
>> >
>> >                            <version>3.1</version>
>> >
>> >
>> >
>> >                            <configuration>
>> >
>> >                                   <source>1.8</source>
>> >
>> >                                   <target>1.8</target>
>> >
>> >                                   <compilerId>jdt</compilerId>
>> >
>> >                                   <!--
>> > executable>${env.JAVA_HOME}/bin/javac</executable> -->
>> >
>> >                            </configuration>
>> >
>> >                            <dependencies>
>> >
>> >                                   <!-- This dependency provides the
>> > implementation of compiler "jdt": -->
>> >
>> >                                   <dependency>
>> >
>> >
>> > <groupId>org.eclipse.tycho</groupId>
>> >
>> >
>> > <artifactId>tycho-compiler-jdt</artifactId>
>> >
>> >                                         <version>0.21.0</version>
>> >
>> >                                   </dependency>
>> >
>> >                            </dependencies>
>> >
>> >                     </plugin>
>> >
>> >              </plugins>
>> >
>> >              <pluginManagement>
>> >
>> >                     <plugins>
>> >
>> >                            <!--This plugin's configuration is used to
>> > store
>> > Eclipse m2e settings
>> >
>> >                                   only. It has no influence on the Maven
>> > build itself. -->
>> >
>> >                            <plugin>
>> >
>> >                                   <groupId>org.eclipse.m2e</groupId>
>> >
>> >
>> > <artifactId>lifecycle-mapping</artifactId>
>> >
>> >                                   <version>1.0.0</version>
>> >
>> >                                   <configuration>
>> >
>> >                                         <lifecycleMappingMetadata>
>> >
>> >                                                <pluginExecutions>
>> >
>> >                                                       <pluginExecution>
>> >
>> >
>> > <pluginExecutionFilter>
>> >
>> >
>> > <groupId>
>> >
>> >
>> > org.apache.maven.plugins
>> >
>> >
>> > </groupId>
>> >
>> >
>> > <artifactId>
>> >
>> >
>> > maven-compiler-plugin
>> >
>> >
>> > </artifactId>
>> >
>> >
>> > <versionRange>
>> >
>> >
>> > [3.1,)
>> >
>> >
>> > </versionRange>
>> >
>> >
>> > <goals>
>> >
>> >
>> > <goal>compile</goal>
>> >
>> >
>> > </goals>
>> >
>> >
>> > </pluginExecutionFilter>
>> >
>> >                                                             <action>
>> >
>> >
>> > <ignore></ignore>
>> >
>> >                                                             </action>
>> >
>> >                                                       </pluginExecution>
>> >
>> >                                                </pluginExecutions>
>> >
>> >                                         </lifecycleMappingMetadata>
>> >
>> >                                   </configuration>
>> >
>> >                            </plugin>
>> >
>> >                     </plugins>
>> >
>> >              </pluginManagement>
>> >
>> >        </build>
>> >
>> >
>> >
>> >        <dependencies>
>> >
>> >              <dependency>
>> >
>> >                     <groupId>junit</groupId>
>> >
>> >                     <artifactId>junit</artifactId>
>> >
>> >                     <version>4.10</version>
>> >
>> >                     <scope>test</scope>
>> >
>> >              </dependency>
>> >
>> >
>> >
>> >              <!-- Flink -->
>> >
>> >              <dependency>
>> >
>> >                     <groupId>org.apache.flink</groupId>
>> >
>> >                     <artifactId>flink-java</artifactId>
>> >
>> >                     <version>0.9.0-milestone-1</version>
>> >
>> >              </dependency>
>> >
>> >              <dependency>
>> >
>> >                     <groupId>org.apache.flink</groupId>
>> >
>> >                     <artifactId>flink-clients</artifactId>
>> >
>> >                     <version>0.9.0-milestone-1</version>
>> >
>> >              </dependency>
>> >
>> >
>> >
>> >
>> >
>> >        </dependencies>
>> >
>> >
>> >
>> > A simple replacement of “1.8” by “1.7” in the pom makes the program
>> > work.
>> >
>> >
>> >
>> > Regards,
>> >
>> > Arnaud
>> >
>> >
>> > ________________________________
>> >
>> > L'intégrité de ce message n'étant pas assurée sur internet, la société
>> > expéditrice ne peut être tenue responsable de son contenu ni de ses
>> > pièces
>> > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> > vous
>> > n'êtes pas destinataire de ce message, merci de le détruire et d'avertir
>> > l'expéditeur.
>> >
>> > The integrity of this message cannot be guaranteed on the Internet. The
>> > company that sent this message cannot therefore be held liable for its
>> > content nor attachments. Any unauthorized use or dissemination is
>> > prohibited. If you are not the intended recipient of this message, then
>> > please delete it and notify the sender.
>
>
Reply | Threaded
Open this post in threaded view
|

RE: Flink Java 8 problem (no lambda, simple code)

LINZ, Arnaud
Hi,

With cluster.getConfig().disableClosureCleaner() it works, thanks ; what are the sides effect of this workaround?

My code is just a plain Junit test, the only code that is called outside of what I gave is :

    public static String chercher(String nom) {
        String retour = null;
        URL url = Thread.currentThread().getContextClassLoader().getResource(nom);
        if (null == url) {
            url = MiscTools.class.getResource(nom);
        }
        if (null != url) {
            retour = url.getFile();
        }
        if (null == retour) {
            retour = nom;
        }
        return retour;
    }

My JRE version is Oracle 1.8.0.31.

I can send you a zip file if you like.

Arnaud

-----Message d'origine-----
De : Aljoscha Krettek [mailto:[hidden email]]
Envoyé : vendredi 24 avril 2015 12:05
À : [hidden email]
Objet : Re: Flink Java 8 problem (no lambda, simple code)

Unfortunately I can't reproduce your error on my machine (OS X, java
8) i created a fresh maven project from your pom and source example and it runs.

As a workaround you can call
cluster.getConfig().disableClosureCleaner(). The closure cleaner normally cleans closures from unneeded stuff because we send them over the network. Here, the flatMap anonymous function has a closure. Is there any other code inside your main function that could cause this?

On Fri, Apr 24, 2015 at 11:34 AM, Stephan Ewen <[hidden email]> wrote:

> One thing I noticed a while back with ASM version 4 and Java 8 had
> issues - but those were related to Java 8 lambdas.
>
> Back then, bumping ASM to version 5 helped it. Not sure if this is the
> same problem, though, since you do not seem to use Java 8 lambdas...
>
> On Fri, Apr 24, 2015 at 11:32 AM, Aljoscha Krettek
> <[hidden email]>
> wrote:
>>
>> I'm looking into it,
>>
>> On Fri, Apr 24, 2015 at 11:13 AM, LINZ, Arnaud
>> <[hidden email]>
>> wrote:
>> > Hi,
>> >
>> >
>> >
>> > I have the following simple code that works well in Java 7 :
>> >
>> >
>> >
>> >         final ExecutionEnvironment cluster =
>> > ExecutionEnvironment.createLocalEnvironment();
>> >
>> >         final DataSet<String> textFile =
>> > cluster.readTextFile(MiscTools.chercher("jeuDeDonnees.txt"));
>> >
>> >         final DataSet<Tuple2<String, Integer>> words = textFile
>> >
>> >             .flatMap(new FlatMapFunction<String, Tuple2<String,
>> > Integer>>()
>> > {
>> >
>> >                 @Override
>> >
>> >                 public void flatMap(String ligne,
>> > Collector<Tuple2<String,
>> > Integer>> out) {
>> >
>> >                     for (final String word : ligne.split("\\s")) {
>> >
>> >                         out.collect(new Tuple2<String,
>> > Integer>(word, 1));
>> >
>> >                     }
>> >
>> >                 }
>> >
>> >             });
>> >
>> >         final DataSet<Tuple2<String, Integer>> wordsCount =
>> > words.groupBy(0).sum(1);
>> >
>> >         wordsCount.print();
>> >
>> >         cluster.execute("testFlink");
>> >
>> >
>> >
>> > When compiled in Java 8 and executed (Oracle JDK or Eclipse JDT
>> > compiler, same result) I have the following stack trace (under
>> > eclipse or with maven
>> > test) :
>> >
>> >
>> >
>> > java.lang.IllegalArgumentException: null
>> >
>> >        at
>> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknow
>> > n
>> > Source)
>> >
>> >        at
>> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknow
>> > n
>> > Source)
>> >
>> >        at
>> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknow
>> > n
>> > Source)
>> >
>> >        at
>> >
>> > org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureClea
>> > ner.java:40)
>> >
>> >        at
>> >
>> > org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.
>> > java:67)
>> >
>> >        at
>> > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:
>> > 54)
>> >
>> >        at org.apache.flink.api.java.DataSet.clean(DataSet.java:185)
>> >
>> >        at
>> > org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)
>> >
>> >
>> >
>> >
>> >
>> > Any idea why ?
>> >
>> >
>> >
>> > Here is my pom.xml :
>> >
>> >
>> >
>> >        <build>
>> >
>> >              <plugins>
>> >
>> >                     <!-- Surefire plugin -->
>> >
>> >                     <plugin>
>> >
>> >                            
>> > <groupId>org.apache.maven.plugins</groupId>
>> >
>> >
>> > <artifactId>maven-surefire-plugin</artifactId>
>> >
>> >                            <version>2.15</version>
>> >
>> >                            <configuration>
>> >
>> >                                  
>> > <jvm>${env.JAVA_HOME}/bin/java</jvm>
>> >
>> >                                   <forkMode>once</forkMode>
>> >
>> >                            </configuration>
>> >
>> >                     </plugin>
>> >
>> >
>> >
>> >
>> >
>> >                     <plugin>
>> >
>> >                            
>> > <groupId>org.apache.maven.plugins</groupId>
>> >
>> >
>> > <artifactId>maven-compiler-plugin</artifactId>
>> >
>> >                            <version>3.1</version>
>> >
>> >
>> >
>> >                            <configuration>
>> >
>> >                                   <source>1.8</source>
>> >
>> >                                   <target>1.8</target>
>> >
>> >                                   <compilerId>jdt</compilerId>
>> >
>> >                                   <!--
>> > executable>${env.JAVA_HOME}/bin/javac</executable> -->
>> >
>> >                            </configuration>
>> >
>> >                            <dependencies>
>> >
>> >                                   <!-- This dependency provides the
>> > implementation of compiler "jdt": -->
>> >
>> >                                   <dependency>
>> >
>> >
>> > <groupId>org.eclipse.tycho</groupId>
>> >
>> >
>> > <artifactId>tycho-compiler-jdt</artifactId>
>> >
>> >                                         <version>0.21.0</version>
>> >
>> >                                   </dependency>
>> >
>> >                            </dependencies>
>> >
>> >                     </plugin>
>> >
>> >              </plugins>
>> >
>> >              <pluginManagement>
>> >
>> >                     <plugins>
>> >
>> >                            <!--This plugin's configuration is used
>> > to store Eclipse m2e settings
>> >
>> >                                   only. It has no influence on the
>> > Maven build itself. -->
>> >
>> >                            <plugin>
>> >
>> >                                  
>> > <groupId>org.eclipse.m2e</groupId>
>> >
>> >
>> > <artifactId>lifecycle-mapping</artifactId>
>> >
>> >                                   <version>1.0.0</version>
>> >
>> >                                   <configuration>
>> >
>> >                                         <lifecycleMappingMetadata>
>> >
>> >                                                <pluginExecutions>
>> >
>> >                                                      
>> > <pluginExecution>
>> >
>> >
>> > <pluginExecutionFilter>
>> >
>> >
>> > <groupId>
>> >
>> >
>> > org.apache.maven.plugins
>> >
>> >
>> > </groupId>
>> >
>> >
>> > <artifactId>
>> >
>> >
>> > maven-compiler-plugin
>> >
>> >
>> > </artifactId>
>> >
>> >
>> > <versionRange>
>> >
>> >
>> > [3.1,)
>> >
>> >
>> > </versionRange>
>> >
>> >
>> > <goals>
>> >
>> >
>> > <goal>compile</goal>
>> >
>> >
>> > </goals>
>> >
>> >
>> > </pluginExecutionFilter>
>> >
>> >                                                            
>> > <action>
>> >
>> >
>> > <ignore></ignore>
>> >
>> >                                                            
>> > </action>
>> >
>> >                                                      
>> > </pluginExecution>
>> >
>> >                                                </pluginExecutions>
>> >
>> >                                         </lifecycleMappingMetadata>
>> >
>> >                                   </configuration>
>> >
>> >                            </plugin>
>> >
>> >                     </plugins>
>> >
>> >              </pluginManagement>
>> >
>> >        </build>
>> >
>> >
>> >
>> >        <dependencies>
>> >
>> >              <dependency>
>> >
>> >                     <groupId>junit</groupId>
>> >
>> >                     <artifactId>junit</artifactId>
>> >
>> >                     <version>4.10</version>
>> >
>> >                     <scope>test</scope>
>> >
>> >              </dependency>
>> >
>> >
>> >
>> >              <!-- Flink -->
>> >
>> >              <dependency>
>> >
>> >                     <groupId>org.apache.flink</groupId>
>> >
>> >                     <artifactId>flink-java</artifactId>
>> >
>> >                     <version>0.9.0-milestone-1</version>
>> >
>> >              </dependency>
>> >
>> >              <dependency>
>> >
>> >                     <groupId>org.apache.flink</groupId>
>> >
>> >                     <artifactId>flink-clients</artifactId>
>> >
>> >                     <version>0.9.0-milestone-1</version>
>> >
>> >              </dependency>
>> >
>> >
>> >
>> >
>> >
>> >        </dependencies>
>> >
>> >
>> >
>> > A simple replacement of “1.8” by “1.7” in the pom makes the program
>> > work.
>> >
>> >
>> >
>> > Regards,
>> >
>> > Arnaud
>> >
>> >
>> > ________________________________
>> >
>> > L'intégrité de ce message n'étant pas assurée sur internet, la
>> > société expéditrice ne peut être tenue responsable de son contenu
>> > ni de ses pièces jointes. Toute utilisation ou diffusion non
>> > autorisée est interdite. Si vous n'êtes pas destinataire de ce
>> > message, merci de le détruire et d'avertir l'expéditeur.
>> >
>> > The integrity of this message cannot be guaranteed on the Internet.
>> > The company that sent this message cannot therefore be held liable
>> > for its content nor attachments. Any unauthorized use or
>> > dissemination is prohibited. If you are not the intended recipient
>> > of this message, then please delete it and notify the sender.
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Java 8 problem (no lambda, simple code)

Stephan Ewen
The effect of disabling the closure cleaner is that some cases can throw you a "NonSerializableException".

If you do not see that, there is no problem anyways, no unexpected side effect :-)

Have you tried the latest version of the Flink master? We updated the code (2 days ago) to use as newer version of the ASM library, because the prior version ha some known issues with Java 8. 

Greetings,
Stephan



On Mon, Apr 27, 2015 at 5:45 PM, LINZ, Arnaud <[hidden email]> wrote:
Hi,

With cluster.getConfig().disableClosureCleaner() it works, thanks ; what are the sides effect of this workaround?

My code is just a plain Junit test, the only code that is called outside of what I gave is :

    public static String chercher(String nom) {
        String retour = null;
        URL url = Thread.currentThread().getContextClassLoader().getResource(nom);
        if (null == url) {
            url = MiscTools.class.getResource(nom);
        }
        if (null != url) {
            retour = url.getFile();
        }
        if (null == retour) {
            retour = nom;
        }
        return retour;
    }

My JRE version is Oracle 1.8.0.31.

I can send you a zip file if you like.

Arnaud

-----Message d'origine-----
De : Aljoscha Krettek [mailto:[hidden email]]
Envoyé : vendredi 24 avril 2015 12:05
À : [hidden email]
Objet : Re: Flink Java 8 problem (no lambda, simple code)

Unfortunately I can't reproduce your error on my machine (OS X, java
8) i created a fresh maven project from your pom and source example and it runs.

As a workaround you can call
cluster.getConfig().disableClosureCleaner(). The closure cleaner normally cleans closures from unneeded stuff because we send them over the network. Here, the flatMap anonymous function has a closure. Is there any other code inside your main function that could cause this?

On Fri, Apr 24, 2015 at 11:34 AM, Stephan Ewen <[hidden email]> wrote:
> One thing I noticed a while back with ASM version 4 and Java 8 had
> issues - but those were related to Java 8 lambdas.
>
> Back then, bumping ASM to version 5 helped it. Not sure if this is the
> same problem, though, since you do not seem to use Java 8 lambdas...
>
> On Fri, Apr 24, 2015 at 11:32 AM, Aljoscha Krettek
> <[hidden email]>
> wrote:
>>
>> I'm looking into it,
>>
>> On Fri, Apr 24, 2015 at 11:13 AM, LINZ, Arnaud
>> <[hidden email]>
>> wrote:
>> > Hi,
>> >
>> >
>> >
>> > I have the following simple code that works well in Java 7 :
>> >
>> >
>> >
>> >         final ExecutionEnvironment cluster =
>> > ExecutionEnvironment.createLocalEnvironment();
>> >
>> >         final DataSet<String> textFile =
>> > cluster.readTextFile(MiscTools.chercher("jeuDeDonnees.txt"));
>> >
>> >         final DataSet<Tuple2<String, Integer>> words = textFile
>> >
>> >             .flatMap(new FlatMapFunction<String, Tuple2<String,
>> > Integer>>()
>> > {
>> >
>> >                 @Override
>> >
>> >                 public void flatMap(String ligne,
>> > Collector<Tuple2<String,
>> > Integer>> out) {
>> >
>> >                     for (final String word : ligne.split("\\s")) {
>> >
>> >                         out.collect(new Tuple2<String,
>> > Integer>(word, 1));
>> >
>> >                     }
>> >
>> >                 }
>> >
>> >             });
>> >
>> >         final DataSet<Tuple2<String, Integer>> wordsCount =
>> > words.groupBy(0).sum(1);
>> >
>> >         wordsCount.print();
>> >
>> >         cluster.execute("testFlink");
>> >
>> >
>> >
>> > When compiled in Java 8 and executed (Oracle JDK or Eclipse JDT
>> > compiler, same result) I have the following stack trace (under
>> > eclipse or with maven
>> > test) :
>> >
>> >
>> >
>> > java.lang.IllegalArgumentException: null
>> >
>> >        at
>> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknow
>> > n
>> > Source)
>> >
>> >        at
>> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknow
>> > n
>> > Source)
>> >
>> >        at
>> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknow
>> > n
>> > Source)
>> >
>> >        at
>> >
>> > org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureClea
>> > ner.java:40)
>> >
>> >        at
>> >
>> > org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.
>> > java:67)
>> >
>> >        at
>> > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:
>> > 54)
>> >
>> >        at org.apache.flink.api.java.DataSet.clean(DataSet.java:185)
>> >
>> >        at
>> > org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)
>> >
>> >
>> >
>> >
>> >
>> > Any idea why ?
>> >
>> >
>> >
>> > Here is my pom.xml :
>> >
>> >
>> >
>> >        <build>
>> >
>> >              <plugins>
>> >
>> >                     <!-- Surefire plugin -->
>> >
>> >                     <plugin>
>> >
>> >
>> > <groupId>org.apache.maven.plugins</groupId>
>> >
>> >
>> > <artifactId>maven-surefire-plugin</artifactId>
>> >
>> >                            <version>2.15</version>
>> >
>> >                            <configuration>
>> >
>> >
>> > <jvm>${env.JAVA_HOME}/bin/java</jvm>
>> >
>> >                                   <forkMode>once</forkMode>
>> >
>> >                            </configuration>
>> >
>> >                     </plugin>
>> >
>> >
>> >
>> >
>> >
>> >                     <plugin>
>> >
>> >
>> > <groupId>org.apache.maven.plugins</groupId>
>> >
>> >
>> > <artifactId>maven-compiler-plugin</artifactId>
>> >
>> >                            <version>3.1</version>
>> >
>> >
>> >
>> >                            <configuration>
>> >
>> >                                   <source>1.8</source>
>> >
>> >                                   <target>1.8</target>
>> >
>> >                                   <compilerId>jdt</compilerId>
>> >
>> >                                   <!--
>> > executable>${env.JAVA_HOME}/bin/javac</executable> -->
>> >
>> >                            </configuration>
>> >
>> >                            <dependencies>
>> >
>> >                                   <!-- This dependency provides the
>> > implementation of compiler "jdt": -->
>> >
>> >                                   <dependency>
>> >
>> >
>> > <groupId>org.eclipse.tycho</groupId>
>> >
>> >
>> > <artifactId>tycho-compiler-jdt</artifactId>
>> >
>> >                                         <version>0.21.0</version>
>> >
>> >                                   </dependency>
>> >
>> >                            </dependencies>
>> >
>> >                     </plugin>
>> >
>> >              </plugins>
>> >
>> >              <pluginManagement>
>> >
>> >                     <plugins>
>> >
>> >                            <!--This plugin's configuration is used
>> > to store Eclipse m2e settings
>> >
>> >                                   only. It has no influence on the
>> > Maven build itself. -->
>> >
>> >                            <plugin>
>> >
>> >
>> > <groupId>org.eclipse.m2e</groupId>
>> >
>> >
>> > <artifactId>lifecycle-mapping</artifactId>
>> >
>> >                                   <version>1.0.0</version>
>> >
>> >                                   <configuration>
>> >
>> >                                         <lifecycleMappingMetadata>
>> >
>> >                                                <pluginExecutions>
>> >
>> >
>> > <pluginExecution>
>> >
>> >
>> > <pluginExecutionFilter>
>> >
>> >
>> > <groupId>
>> >
>> >
>> > org.apache.maven.plugins
>> >
>> >
>> > </groupId>
>> >
>> >
>> > <artifactId>
>> >
>> >
>> > maven-compiler-plugin
>> >
>> >
>> > </artifactId>
>> >
>> >
>> > <versionRange>
>> >
>> >
>> > [3.1,)
>> >
>> >
>> > </versionRange>
>> >
>> >
>> > <goals>
>> >
>> >
>> > <goal>compile</goal>
>> >
>> >
>> > </goals>
>> >
>> >
>> > </pluginExecutionFilter>
>> >
>> >
>> > <action>
>> >
>> >
>> > <ignore></ignore>
>> >
>> >
>> > </action>
>> >
>> >
>> > </pluginExecution>
>> >
>> >                                                </pluginExecutions>
>> >
>> >                                         </lifecycleMappingMetadata>
>> >
>> >                                   </configuration>
>> >
>> >                            </plugin>
>> >
>> >                     </plugins>
>> >
>> >              </pluginManagement>
>> >
>> >        </build>
>> >
>> >
>> >
>> >        <dependencies>
>> >
>> >              <dependency>
>> >
>> >                     <groupId>junit</groupId>
>> >
>> >                     <artifactId>junit</artifactId>
>> >
>> >                     <version>4.10</version>
>> >
>> >                     <scope>test</scope>
>> >
>> >              </dependency>
>> >
>> >
>> >
>> >              <!-- Flink -->
>> >
>> >              <dependency>
>> >
>> >                     <groupId>org.apache.flink</groupId>
>> >
>> >                     <artifactId>flink-java</artifactId>
>> >
>> >                     <version>0.9.0-milestone-1</version>
>> >
>> >              </dependency>
>> >
>> >              <dependency>
>> >
>> >                     <groupId>org.apache.flink</groupId>
>> >
>> >                     <artifactId>flink-clients</artifactId>
>> >
>> >                     <version>0.9.0-milestone-1</version>
>> >
>> >              </dependency>
>> >
>> >
>> >
>> >
>> >
>> >        </dependencies>
>> >
>> >
>> >
>> > A simple replacement of “1.8” by “1.7” in the pom makes the program
>> > work.
>> >
>> >
>> >
>> > Regards,
>> >
>> > Arnaud
>> >
>> >
>> > ________________________________
>> >
>> > L'intégrité de ce message n'étant pas assurée sur internet, la
>> > société expéditrice ne peut être tenue responsable de son contenu
>> > ni de ses pièces jointes. Toute utilisation ou diffusion non
>> > autorisée est interdite. Si vous n'êtes pas destinataire de ce
>> > message, merci de le détruire et d'avertir l'expéditeur.
>> >
>> > The integrity of this message cannot be guaranteed on the Internet.
>> > The company that sent this message cannot therefore be held liable
>> > for its content nor attachments. Any unauthorized use or
>> > dissemination is prohibited. If you are not the intended recipient
>> > of this message, then please delete it and notify the sender.
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink Java 8 problem (no lambda, simple code)

Aljoscha Krettek
If you could send us the zip file, that would be much appreciated.
Then we can try to find the bug in the closure cleaner.

The mailing list does not allow attachments but you could send it to
me and I'll have a look.

On Mon, Apr 27, 2015 at 6:35 PM, Stephan Ewen <[hidden email]> wrote:

> The effect of disabling the closure cleaner is that some cases can throw you
> a "NonSerializableException".
>
> If you do not see that, there is no problem anyways, no unexpected side
> effect :-)
>
> Have you tried the latest version of the Flink master? We updated the code
> (2 days ago) to use as newer version of the ASM library, because the prior
> version ha some known issues with Java 8.
>
> Greetings,
> Stephan
>
>
>
> On Mon, Apr 27, 2015 at 5:45 PM, LINZ, Arnaud <[hidden email]>
> wrote:
>>
>> Hi,
>>
>> With cluster.getConfig().disableClosureCleaner() it works, thanks ; what
>> are the sides effect of this workaround?
>>
>> My code is just a plain Junit test, the only code that is called outside
>> of what I gave is :
>>
>>     public static String chercher(String nom) {
>>         String retour = null;
>>         URL url =
>> Thread.currentThread().getContextClassLoader().getResource(nom);
>>         if (null == url) {
>>             url = MiscTools.class.getResource(nom);
>>         }
>>         if (null != url) {
>>             retour = url.getFile();
>>         }
>>         if (null == retour) {
>>             retour = nom;
>>         }
>>         return retour;
>>     }
>>
>> My JRE version is Oracle 1.8.0.31.
>>
>> I can send you a zip file if you like.
>>
>> Arnaud
>>
>> -----Message d'origine-----
>> De : Aljoscha Krettek [mailto:[hidden email]]
>> Envoyé : vendredi 24 avril 2015 12:05
>> À : [hidden email]
>> Objet : Re: Flink Java 8 problem (no lambda, simple code)
>>
>> Unfortunately I can't reproduce your error on my machine (OS X, java
>> 8) i created a fresh maven project from your pom and source example and it
>> runs.
>>
>> As a workaround you can call
>> cluster.getConfig().disableClosureCleaner(). The closure cleaner normally
>> cleans closures from unneeded stuff because we send them over the network.
>> Here, the flatMap anonymous function has a closure. Is there any other code
>> inside your main function that could cause this?
>>
>> On Fri, Apr 24, 2015 at 11:34 AM, Stephan Ewen <[hidden email]> wrote:
>> > One thing I noticed a while back with ASM version 4 and Java 8 had
>> > issues - but those were related to Java 8 lambdas.
>> >
>> > Back then, bumping ASM to version 5 helped it. Not sure if this is the
>> > same problem, though, since you do not seem to use Java 8 lambdas...
>> >
>> > On Fri, Apr 24, 2015 at 11:32 AM, Aljoscha Krettek
>> > <[hidden email]>
>> > wrote:
>> >>
>> >> I'm looking into it,
>> >>
>> >> On Fri, Apr 24, 2015 at 11:13 AM, LINZ, Arnaud
>> >> <[hidden email]>
>> >> wrote:
>> >> > Hi,
>> >> >
>> >> >
>> >> >
>> >> > I have the following simple code that works well in Java 7 :
>> >> >
>> >> >
>> >> >
>> >> >         final ExecutionEnvironment cluster =
>> >> > ExecutionEnvironment.createLocalEnvironment();
>> >> >
>> >> >         final DataSet<String> textFile =
>> >> > cluster.readTextFile(MiscTools.chercher("jeuDeDonnees.txt"));
>> >> >
>> >> >         final DataSet<Tuple2<String, Integer>> words = textFile
>> >> >
>> >> >             .flatMap(new FlatMapFunction<String, Tuple2<String,
>> >> > Integer>>()
>> >> > {
>> >> >
>> >> >                 @Override
>> >> >
>> >> >                 public void flatMap(String ligne,
>> >> > Collector<Tuple2<String,
>> >> > Integer>> out) {
>> >> >
>> >> >                     for (final String word : ligne.split("\\s")) {
>> >> >
>> >> >                         out.collect(new Tuple2<String,
>> >> > Integer>(word, 1));
>> >> >
>> >> >                     }
>> >> >
>> >> >                 }
>> >> >
>> >> >             });
>> >> >
>> >> >         final DataSet<Tuple2<String, Integer>> wordsCount =
>> >> > words.groupBy(0).sum(1);
>> >> >
>> >> >         wordsCount.print();
>> >> >
>> >> >         cluster.execute("testFlink");
>> >> >
>> >> >
>> >> >
>> >> > When compiled in Java 8 and executed (Oracle JDK or Eclipse JDT
>> >> > compiler, same result) I have the following stack trace (under
>> >> > eclipse or with maven
>> >> > test) :
>> >> >
>> >> >
>> >> >
>> >> > java.lang.IllegalArgumentException: null
>> >> >
>> >> >        at
>> >> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknow
>> >> > n
>> >> > Source)
>> >> >
>> >> >        at
>> >> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknow
>> >> > n
>> >> > Source)
>> >> >
>> >> >        at
>> >> > org.apache.flink.shaded.org.objectweb.asm.ClassReader.<init>(Unknow
>> >> > n
>> >> > Source)
>> >> >
>> >> >        at
>> >> >
>> >> > org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureClea
>> >> > ner.java:40)
>> >> >
>> >> >        at
>> >> >
>> >> > org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.
>> >> > java:67)
>> >> >
>> >> >        at
>> >> > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:
>> >> > 54)
>> >> >
>> >> >        at org.apache.flink.api.java.DataSet.clean(DataSet.java:185)
>> >> >
>> >> >        at
>> >> > org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > Any idea why ?
>> >> >
>> >> >
>> >> >
>> >> > Here is my pom.xml :
>> >> >
>> >> >
>> >> >
>> >> >        <build>
>> >> >
>> >> >              <plugins>
>> >> >
>> >> >                     <!-- Surefire plugin -->
>> >> >
>> >> >                     <plugin>
>> >> >
>> >> >
>> >> > <groupId>org.apache.maven.plugins</groupId>
>> >> >
>> >> >
>> >> > <artifactId>maven-surefire-plugin</artifactId>
>> >> >
>> >> >                            <version>2.15</version>
>> >> >
>> >> >                            <configuration>
>> >> >
>> >> >
>> >> > <jvm>${env.JAVA_HOME}/bin/java</jvm>
>> >> >
>> >> >                                   <forkMode>once</forkMode>
>> >> >
>> >> >                            </configuration>
>> >> >
>> >> >                     </plugin>
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >                     <plugin>
>> >> >
>> >> >
>> >> > <groupId>org.apache.maven.plugins</groupId>
>> >> >
>> >> >
>> >> > <artifactId>maven-compiler-plugin</artifactId>
>> >> >
>> >> >                            <version>3.1</version>
>> >> >
>> >> >
>> >> >
>> >> >                            <configuration>
>> >> >
>> >> >                                   <source>1.8</source>
>> >> >
>> >> >                                   <target>1.8</target>
>> >> >
>> >> >                                   <compilerId>jdt</compilerId>
>> >> >
>> >> >                                   <!--
>> >> > executable>${env.JAVA_HOME}/bin/javac</executable> -->
>> >> >
>> >> >                            </configuration>
>> >> >
>> >> >                            <dependencies>
>> >> >
>> >> >                                   <!-- This dependency provides the
>> >> > implementation of compiler "jdt": -->
>> >> >
>> >> >                                   <dependency>
>> >> >
>> >> >
>> >> > <groupId>org.eclipse.tycho</groupId>
>> >> >
>> >> >
>> >> > <artifactId>tycho-compiler-jdt</artifactId>
>> >> >
>> >> >                                         <version>0.21.0</version>
>> >> >
>> >> >                                   </dependency>
>> >> >
>> >> >                            </dependencies>
>> >> >
>> >> >                     </plugin>
>> >> >
>> >> >              </plugins>
>> >> >
>> >> >              <pluginManagement>
>> >> >
>> >> >                     <plugins>
>> >> >
>> >> >                            <!--This plugin's configuration is used
>> >> > to store Eclipse m2e settings
>> >> >
>> >> >                                   only. It has no influence on the
>> >> > Maven build itself. -->
>> >> >
>> >> >                            <plugin>
>> >> >
>> >> >
>> >> > <groupId>org.eclipse.m2e</groupId>
>> >> >
>> >> >
>> >> > <artifactId>lifecycle-mapping</artifactId>
>> >> >
>> >> >                                   <version>1.0.0</version>
>> >> >
>> >> >                                   <configuration>
>> >> >
>> >> >                                         <lifecycleMappingMetadata>
>> >> >
>> >> >                                                <pluginExecutions>
>> >> >
>> >> >
>> >> > <pluginExecution>
>> >> >
>> >> >
>> >> > <pluginExecutionFilter>
>> >> >
>> >> >
>> >> > <groupId>
>> >> >
>> >> >
>> >> > org.apache.maven.plugins
>> >> >
>> >> >
>> >> > </groupId>
>> >> >
>> >> >
>> >> > <artifactId>
>> >> >
>> >> >
>> >> > maven-compiler-plugin
>> >> >
>> >> >
>> >> > </artifactId>
>> >> >
>> >> >
>> >> > <versionRange>
>> >> >
>> >> >
>> >> > [3.1,)
>> >> >
>> >> >
>> >> > </versionRange>
>> >> >
>> >> >
>> >> > <goals>
>> >> >
>> >> >
>> >> > <goal>compile</goal>
>> >> >
>> >> >
>> >> > </goals>
>> >> >
>> >> >
>> >> > </pluginExecutionFilter>
>> >> >
>> >> >
>> >> > <action>
>> >> >
>> >> >
>> >> > <ignore></ignore>
>> >> >
>> >> >
>> >> > </action>
>> >> >
>> >> >
>> >> > </pluginExecution>
>> >> >
>> >> >                                                </pluginExecutions>
>> >> >
>> >> >                                         </lifecycleMappingMetadata>
>> >> >
>> >> >                                   </configuration>
>> >> >
>> >> >                            </plugin>
>> >> >
>> >> >                     </plugins>
>> >> >
>> >> >              </pluginManagement>
>> >> >
>> >> >        </build>
>> >> >
>> >> >
>> >> >
>> >> >        <dependencies>
>> >> >
>> >> >              <dependency>
>> >> >
>> >> >                     <groupId>junit</groupId>
>> >> >
>> >> >                     <artifactId>junit</artifactId>
>> >> >
>> >> >                     <version>4.10</version>
>> >> >
>> >> >                     <scope>test</scope>
>> >> >
>> >> >              </dependency>
>> >> >
>> >> >
>> >> >
>> >> >              <!-- Flink -->
>> >> >
>> >> >              <dependency>
>> >> >
>> >> >                     <groupId>org.apache.flink</groupId>
>> >> >
>> >> >                     <artifactId>flink-java</artifactId>
>> >> >
>> >> >                     <version>0.9.0-milestone-1</version>
>> >> >
>> >> >              </dependency>
>> >> >
>> >> >              <dependency>
>> >> >
>> >> >                     <groupId>org.apache.flink</groupId>
>> >> >
>> >> >                     <artifactId>flink-clients</artifactId>
>> >> >
>> >> >                     <version>0.9.0-milestone-1</version>
>> >> >
>> >> >              </dependency>
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >        </dependencies>
>> >> >
>> >> >
>> >> >
>> >> > A simple replacement of “1.8” by “1.7” in the pom makes the program
>> >> > work.
>> >> >
>> >> >
>> >> >
>> >> > Regards,
>> >> >
>> >> > Arnaud
>> >> >
>> >> >
>> >> > ________________________________
>> >> >
>> >> > L'intégrité de ce message n'étant pas assurée sur internet, la
>> >> > société expéditrice ne peut être tenue responsable de son contenu
>> >> > ni de ses pièces jointes. Toute utilisation ou diffusion non
>> >> > autorisée est interdite. Si vous n'êtes pas destinataire de ce
>> >> > message, merci de le détruire et d'avertir l'expéditeur.
>> >> >
>> >> > The integrity of this message cannot be guaranteed on the Internet.
>> >> > The company that sent this message cannot therefore be held liable
>> >> > for its content nor attachments. Any unauthorized use or
>> >> > dissemination is prohibited. If you are not the intended recipient
>> >> > of this message, then please delete it and notify the sender.
>> >
>> >
>
>