Bug broadcasting objects (serialization issue)

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Bug broadcasting objects (serialization issue)

Andres R. Masegosa
Hi,

I get a bug when trying to broadcast a list of integers created with the
primitive "Arrays.asList(...)".

For example, if you try to run this "wordcount" example, you can
reproduce the bug.


public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = Arrays.asList(0, 0, 0);

        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String,
Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
        }


    }
}


However, if instead of using the primitive "Arrays.asList(...)", we use
instead the ArrayList<> constructor, there is any problem!!!!


Regards,
Andres
Reply | Threaded
Open this post in threaded view
|

Re: Bug broadcasting objects (serialization issue)

Maximilian Michels
Hi Andreas,

Thank you for reporting the problem and including the code to reproduce the problem. I think there is a problem with the class serialization or deserialization. Arrays.asList uses a private ArrayList class (java.util.Arrays$ArrayList) which is not the one you would normally use (java.util.ArrayList).

I'll create a JIRA issue to keep track of the problem and to investigate further.

Best regards,
Max

Here's the stack trace:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at main(Test.java:32) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
    at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
    ... 25 more
Caused by: java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
    at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
    ... 26 more

On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <[hidden email]> wrote:
Hi,

I get a bug when trying to broadcast a list of integers created with the
primitive "Arrays.asList(...)".

For example, if you try to run this "wordcount" example, you can
reproduce the bug.


public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = Arrays.asList(0, 0, 0);

        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String,
Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
        }


    }
}


However, if instead of using the primitive "Arrays.asList(...)", we use
instead the ArrayList<> constructor, there is any problem!!!!


Regards,
Andres

Reply | Threaded
Open this post in threaded view
|

Re: Bug broadcasting objects (serialization issue)

Maximilian Michels

On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <[hidden email]> wrote:
Hi Andreas,

Thank you for reporting the problem and including the code to reproduce the problem. I think there is a problem with the class serialization or deserialization. Arrays.asList uses a private ArrayList class (java.util.Arrays$ArrayList) which is not the one you would normally use (java.util.ArrayList).

I'll create a JIRA issue to keep track of the problem and to investigate further.

Best regards,
Max

Here's the stack trace:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at main(Test.java:32) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
    at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
    ... 25 more
Caused by: java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
    at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
    ... 26 more

On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <[hidden email]> wrote:
Hi,

I get a bug when trying to broadcast a list of integers created with the
primitive "Arrays.asList(...)".

For example, if you try to run this "wordcount" example, you can
reproduce the bug.


public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = Arrays.asList(0, 0, 0);

        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String,
Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
        }


    }
}


However, if instead of using the primitive "Arrays.asList(...)", we use
instead the ArrayList<> constructor, there is any problem!!!!


Regards,
Andres


Reply | Threaded
Open this post in threaded view
|

Re: Bug broadcasting objects (serialization issue)

Stephan Ewen
We should try to improve the exception here. More people will run into this issue and the exception should help them understand it well.

How about we do eager serialization into a set of byte arrays? Then the serializability issue comes immediately when the program is constructed, rather than later, when it is shipped.

On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <[hidden email]> wrote:

On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <[hidden email]> wrote:
Hi Andreas,

Thank you for reporting the problem and including the code to reproduce the problem. I think there is a problem with the class serialization or deserialization. Arrays.asList uses a private ArrayList class (java.util.Arrays$ArrayList) which is not the one you would normally use (java.util.ArrayList).

I'll create a JIRA issue to keep track of the problem and to investigate further.

Best regards,
Max

Here's the stack trace:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at main(Test.java:32) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
    at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
    ... 25 more
Caused by: java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
    at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
    ... 26 more

On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <[hidden email]> wrote:
Hi,

I get a bug when trying to broadcast a list of integers created with the
primitive "Arrays.asList(...)".

For example, if you try to run this "wordcount" example, you can
reproduce the bug.


public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = Arrays.asList(0, 0, 0);

        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String,
Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
        }


    }
}


However, if instead of using the primitive "Arrays.asList(...)", we use
instead the ArrayList<> constructor, there is any problem!!!!


Regards,
Andres



Reply | Threaded
Open this post in threaded view
|

Re: Bug broadcasting objects (serialization issue)

Maximilian Michels
Nice suggestion. So you want to serialize and deserialize the InputFormats on the Client to check whether they can be transferred correctly? Merely serializing is not enough because the above Exception occurs during deserialization.

On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
We should try to improve the exception here. More people will run into this issue and the exception should help them understand it well.

How about we do eager serialization into a set of byte arrays? Then the serializability issue comes immediately when the program is constructed, rather than later, when it is shipped.

On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <[hidden email]> wrote:

On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <[hidden email]> wrote:
Hi Andreas,

Thank you for reporting the problem and including the code to reproduce the problem. I think there is a problem with the class serialization or deserialization. Arrays.asList uses a private ArrayList class (java.util.Arrays$ArrayList) which is not the one you would normally use (java.util.ArrayList).

I'll create a JIRA issue to keep track of the problem and to investigate further.

Best regards,
Max

Here's the stack trace:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at main(Test.java:32) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
    at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
    ... 25 more
Caused by: java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
    at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
    ... 26 more

On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <[hidden email]> wrote:
Hi,

I get a bug when trying to broadcast a list of integers created with the
primitive "Arrays.asList(...)".

For example, if you try to run this "wordcount" example, you can
reproduce the bug.


public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = Arrays.asList(0, 0, 0);

        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String,
Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
        }


    }
}


However, if instead of using the primitive "Arrays.asList(...)", we use
instead the ArrayList<> constructor, there is any problem!!!!


Regards,
Andres




Reply | Threaded
Open this post in threaded view
|

Re: Bug broadcasting objects (serialization issue)

Stephan Ewen
Yes, even serialize in the constructor. Then the failure (if serialization does not work) comes immediately.

On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels <[hidden email]> wrote:
Nice suggestion. So you want to serialize and deserialize the InputFormats on the Client to check whether they can be transferred correctly? Merely serializing is not enough because the above Exception occurs during deserialization.

On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
We should try to improve the exception here. More people will run into this issue and the exception should help them understand it well.

How about we do eager serialization into a set of byte arrays? Then the serializability issue comes immediately when the program is constructed, rather than later, when it is shipped.

On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <[hidden email]> wrote:

On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <[hidden email]> wrote:
Hi Andreas,

Thank you for reporting the problem and including the code to reproduce the problem. I think there is a problem with the class serialization or deserialization. Arrays.asList uses a private ArrayList class (java.util.Arrays$ArrayList) which is not the one you would normally use (java.util.ArrayList).

I'll create a JIRA issue to keep track of the problem and to investigate further.

Best regards,
Max

Here's the stack trace:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at main(Test.java:32) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
    at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
    ... 25 more
Caused by: java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
    at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
    ... 26 more

On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <[hidden email]> wrote:
Hi,

I get a bug when trying to broadcast a list of integers created with the
primitive "Arrays.asList(...)".

For example, if you try to run this "wordcount" example, you can
reproduce the bug.


public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = Arrays.asList(0, 0, 0);

        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String,
Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
        }


    }
}


However, if instead of using the primitive "Arrays.asList(...)", we use
instead the ArrayList<> constructor, there is any problem!!!!


Regards,
Andres





Reply | Threaded
Open this post in threaded view
|

Re: Bug broadcasting objects (serialization issue)

Maximilian Michels
Ok but that would not prevent the above error, right? Serializing is
not the issue here.

Nevertheless, it would catch all errors during initial serialization.
Deserializing has its own hazards due to possible Classloader issues.

On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:

> Yes, even serialize in the constructor. Then the failure (if serialization
> does not work) comes immediately.
>
> On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> Nice suggestion. So you want to serialize and deserialize the InputFormats
>> on the Client to check whether they can be transferred correctly? Merely
>> serializing is not enough because the above Exception occurs during
>> deserialization.
>>
>> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
>>>
>>> We should try to improve the exception here. More people will run into
>>> this issue and the exception should help them understand it well.
>>>
>>> How about we do eager serialization into a set of byte arrays? Then the
>>> serializability issue comes immediately when the program is constructed,
>>> rather than later, when it is shipped.
>>>
>>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <[hidden email]>
>>> wrote:
>>>>
>>>> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608
>>>>
>>>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Hi Andreas,
>>>>>
>>>>> Thank you for reporting the problem and including the code to reproduce
>>>>> the problem. I think there is a problem with the class serialization or
>>>>> deserialization. Arrays.asList uses a private ArrayList class
>>>>> (java.util.Arrays$ArrayList) which is not the one you would normally use
>>>>> (java.util.ArrayList).
>>>>>
>>>>> I'll create a JIRA issue to keep track of the problem and to
>>>>> investigate further.
>>>>>
>>>>> Best regards,
>>>>> Max
>>>>>
>>>>> Here's the stack trace:
>>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
>>>>> task 'DataSource (at main(Test.java:32)
>>>>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
>>>>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>     at
>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>>>>     at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>>>     at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>>>     at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>>>     at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>>>>     at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>>>     at
>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>>     at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: Deserializing the InputFormat
>>>>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>>>>>     at
>>>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>>>>>     ... 25 more
>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>>     at
>>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>>>>>     at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>>>>>     at
>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>>>     at
>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>>>     at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>>>     at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>>>>     at
>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>>>>>     at
>>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>>>>>     at
>>>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>>>>>     ... 26 more
>>>>>
>>>>> On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I get a bug when trying to broadcast a list of integers created with
>>>>>> the
>>>>>> primitive "Arrays.asList(...)".
>>>>>>
>>>>>> For example, if you try to run this "wordcount" example, you can
>>>>>> reproduce the bug.
>>>>>>
>>>>>>
>>>>>> public class WordCountExample {
>>>>>>     public static void main(String[] args) throws Exception {
>>>>>>         final ExecutionEnvironment env =
>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>
>>>>>>     DataSet<String> text = env.fromElements(
>>>>>>                 "Who's there?",
>>>>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>>>>
>>>>>>         List<Integer> elements = Arrays.asList(0, 0, 0);
>>>>>>
>>>>>>         DataSet<TestClass> set = env.fromElements(new
>>>>>> TestClass(elements));
>>>>>>
>>>>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>>>>                 .flatMap(new LineSplitter())
>>>>>>                 .withBroadcastSet(set, "set")
>>>>>>                 .groupBy(0)
>>>>>>                 .sum(1);
>>>>>>
>>>>>>         wordCounts.print();
>>>>>>     }
>>>>>>
>>>>>>     public static class LineSplitter implements
>>>>>> FlatMapFunction<String,
>>>>>> Tuple2<String, Integer>> {
>>>>>>         @Override
>>>>>>         public void flatMap(String line, Collector<Tuple2<String,
>>>>>> Integer>> out) {
>>>>>>             for (String word : line.split(" ")) {
>>>>>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>>>>>>             }
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     public static class TestClass implements Serializable {
>>>>>>         private static final long serialVersionUID =
>>>>>> -2932037991574118651L;
>>>>>>
>>>>>>         List<Integer> integerList;
>>>>>>         public TestClass(List<Integer> integerList){
>>>>>>             this.integerList=integerList;
>>>>>>         }
>>>>>>
>>>>>>
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> However, if instead of using the primitive "Arrays.asList(...)", we
>>>>>> use
>>>>>> instead the ArrayList<> constructor, there is any problem!!!!
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Andres
>>>>>
>>>>>
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Bug broadcasting objects (serialization issue)

Stephan Ewen
I see.

Manual serialization implies also manual deserialization (on the workers only), which would give a better exception. 

BTW: There is an opportunity to fix two problems with one patch: The framesize overflow for the input format, and the serialization.

On Wed, Sep 2, 2015 at 4:16 PM, Maximilian Michels <[hidden email]> wrote:
Ok but that would not prevent the above error, right? Serializing is
not the issue here.

Nevertheless, it would catch all errors during initial serialization.
Deserializing has its own hazards due to possible Classloader issues.

On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
> Yes, even serialize in the constructor. Then the failure (if serialization
> does not work) comes immediately.
>
> On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> Nice suggestion. So you want to serialize and deserialize the InputFormats
>> on the Client to check whether they can be transferred correctly? Merely
>> serializing is not enough because the above Exception occurs during
>> deserialization.
>>
>> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
>>>
>>> We should try to improve the exception here. More people will run into
>>> this issue and the exception should help them understand it well.
>>>
>>> How about we do eager serialization into a set of byte arrays? Then the
>>> serializability issue comes immediately when the program is constructed,
>>> rather than later, when it is shipped.
>>>
>>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <[hidden email]>
>>> wrote:
>>>>
>>>> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608
>>>>
>>>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Hi Andreas,
>>>>>
>>>>> Thank you for reporting the problem and including the code to reproduce
>>>>> the problem. I think there is a problem with the class serialization or
>>>>> deserialization. Arrays.asList uses a private ArrayList class
>>>>> (java.util.Arrays$ArrayList) which is not the one you would normally use
>>>>> (java.util.ArrayList).
>>>>>
>>>>> I'll create a JIRA issue to keep track of the problem and to
>>>>> investigate further.
>>>>>
>>>>> Best regards,
>>>>> Max
>>>>>
>>>>> Here's the stack trace:
>>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
>>>>> task 'DataSource (at main(Test.java:32)
>>>>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
>>>>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>     at
>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>>>>     at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>>>     at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>>>     at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>>>     at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>>>>     at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>>>     at
>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>>     at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: Deserializing the InputFormat
>>>>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>>>>>     at
>>>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>>>>>     at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>>>>>     ... 25 more
>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>>     at
>>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>>>>>     at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>>>>>     at
>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>>>     at
>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>>>     at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>>>     at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>>>>     at
>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>>>>>     at
>>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>>>>>     at
>>>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>>>>>     ... 26 more
>>>>>
>>>>> On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I get a bug when trying to broadcast a list of integers created with
>>>>>> the
>>>>>> primitive "Arrays.asList(...)".
>>>>>>
>>>>>> For example, if you try to run this "wordcount" example, you can
>>>>>> reproduce the bug.
>>>>>>
>>>>>>
>>>>>> public class WordCountExample {
>>>>>>     public static void main(String[] args) throws Exception {
>>>>>>         final ExecutionEnvironment env =
>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>
>>>>>>     DataSet<String> text = env.fromElements(
>>>>>>                 "Who's there?",
>>>>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>>>>
>>>>>>         List<Integer> elements = Arrays.asList(0, 0, 0);
>>>>>>
>>>>>>         DataSet<TestClass> set = env.fromElements(new
>>>>>> TestClass(elements));
>>>>>>
>>>>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>>>>                 .flatMap(new LineSplitter())
>>>>>>                 .withBroadcastSet(set, "set")
>>>>>>                 .groupBy(0)
>>>>>>                 .sum(1);
>>>>>>
>>>>>>         wordCounts.print();
>>>>>>     }
>>>>>>
>>>>>>     public static class LineSplitter implements
>>>>>> FlatMapFunction<String,
>>>>>> Tuple2<String, Integer>> {
>>>>>>         @Override
>>>>>>         public void flatMap(String line, Collector<Tuple2<String,
>>>>>> Integer>> out) {
>>>>>>             for (String word : line.split(" ")) {
>>>>>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>>>>>>             }
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     public static class TestClass implements Serializable {
>>>>>>         private static final long serialVersionUID =
>>>>>> -2932037991574118651L;
>>>>>>
>>>>>>         List<Integer> integerList;
>>>>>>         public TestClass(List<Integer> integerList){
>>>>>>             this.integerList=integerList;
>>>>>>         }
>>>>>>
>>>>>>
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> However, if instead of using the primitive "Arrays.asList(...)", we
>>>>>> use
>>>>>> instead the ArrayList<> constructor, there is any problem!!!!
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Andres
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Bug broadcasting objects (serialization issue)

Andres R. Masegosa
In reply to this post by Andres R. Masegosa
Hi,

I get a new similar bug when broadcasting a list of integers if this
list is made unmodifiable,

        elements = Collections.unmodifiableList(elements);


I include this code to reproduce the result,


public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);

        elements = Collections.unmodifiableList(elements);

        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String,
Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
        }


    }
}

Thanks for your support,
Andres

On 2/9/15 11:17, Andres R. Masegosa  wrote:

> Hi,
>
> I get a bug when trying to broadcast a list of integers created with the
> primitive "Arrays.asList(...)".
>
> For example, if you try to run this "wordcount" example, you can
> reproduce the bug.
>
>
> public class WordCountExample {
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>     DataSet<String> text = env.fromElements(
>                 "Who's there?",
>                 "I think I hear them. Stand, ho! Who's there?");
>
>         List<Integer> elements = Arrays.asList(0, 0, 0);
>
>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>
>         DataSet<Tuple2<String, Integer>> wordCounts = text
>                 .flatMap(new LineSplitter())
>                 .withBroadcastSet(set, "set")
>                 .groupBy(0)
>                 .sum(1);
>
>         wordCounts.print();
>     }
>
>     public static class LineSplitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String line, Collector<Tuple2<String,
> Integer>> out) {
>             for (String word : line.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
>
>     public static class TestClass implements Serializable {
>         private static final long serialVersionUID = -2932037991574118651L;
>
>         List<Integer> integerList;
>         public TestClass(List<Integer> integerList){
>             this.integerList=integerList;
>         }
>
>
>     }
> }
>
>
> However, if instead of using the primitive "Arrays.asList(...)", we use
> instead the ArrayList<> constructor, there is any problem!!!!
>
>
> Regards,
> Andres
>
Reply | Threaded
Open this post in threaded view
|

Re: Bug broadcasting objects (serialization issue)

Maximilian Michels
In reply to this post by Stephan Ewen
Thanks for clarifying the "eager serialization". By serializing and
deserializing explicitly (eagerly) we can raise better Exceptions to
notify the user of non-serializable classes.

> BTW: There is an opportunity to fix two problems with one patch: The framesize overflow for the input format, and the serialization.

IMHO this adds another layer of complexity to the job submission
phase. I just had a chat with Robert about this. I wonder, is it
possible to increase the Akka framesize only for the Client
ActorSystem?

On Wed, Sep 2, 2015 at 4:27 PM, Stephan Ewen <[hidden email]> wrote:

> I see.
>
> Manual serialization implies also manual deserialization (on the workers
> only), which would give a better exception.
>
> BTW: There is an opportunity to fix two problems with one patch: The
> framesize overflow for the input format, and the serialization.
>
> On Wed, Sep 2, 2015 at 4:16 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> Ok but that would not prevent the above error, right? Serializing is
>> not the issue here.
>>
>> Nevertheless, it would catch all errors during initial serialization.
>> Deserializing has its own hazards due to possible Classloader issues.
>>
>> On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
>> > Yes, even serialize in the constructor. Then the failure (if
>> > serialization
>> > does not work) comes immediately.
>> >
>> > On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels <[hidden email]>
>> > wrote:
>> >>
>> >> Nice suggestion. So you want to serialize and deserialize the
>> >> InputFormats
>> >> on the Client to check whether they can be transferred correctly?
>> >> Merely
>> >> serializing is not enough because the above Exception occurs during
>> >> deserialization.
>> >>
>> >> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
>> >>>
>> >>> We should try to improve the exception here. More people will run into
>> >>> this issue and the exception should help them understand it well.
>> >>>
>> >>> How about we do eager serialization into a set of byte arrays? Then
>> >>> the
>> >>> serializability issue comes immediately when the program is
>> >>> constructed,
>> >>> rather than later, when it is shipped.
>> >>>
>> >>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <[hidden email]>
>> >>> wrote:
>> >>>>
>> >>>> Here's the JIRA issue:
>> >>>> https://issues.apache.org/jira/browse/FLINK-2608
>> >>>>
>> >>>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <[hidden email]>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi Andreas,
>> >>>>>
>> >>>>> Thank you for reporting the problem and including the code to
>> >>>>> reproduce
>> >>>>> the problem. I think there is a problem with the class serialization
>> >>>>> or
>> >>>>> deserialization. Arrays.asList uses a private ArrayList class
>> >>>>> (java.util.Arrays$ArrayList) which is not the one you would normally
>> >>>>> use
>> >>>>> (java.util.ArrayList).
>> >>>>>
>> >>>>> I'll create a JIRA issue to keep track of the problem and to
>> >>>>> investigate further.
>> >>>>>
>> >>>>> Best regards,
>> >>>>> Max
>> >>>>>
>> >>>>> Here's the stack trace:
>> >>>>>
>> >>>>> Exception in thread "main"
>> >>>>> org.apache.flink.runtime.client.JobExecutionException: Cannot
>> >>>>> initialize
>> >>>>> task 'DataSource (at main(Test.java:32)
>> >>>>> (org.apache.flink.api.java.io.CollectionInputFormat))':
>> >>>>> Deserializing the
>> >>>>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block
>> >>>>> data
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>> >>>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>>>>     at
>> >>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>>>>     at
>> >>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >>>>>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>> >>>>>     at
>> >>>>>
>> >>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>> >>>>>     at
>> >>>>>
>> >>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>> >>>>>     at
>> >>>>>
>> >>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>> >>>>>     at
>> >>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>> >>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>> >>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> >>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> >>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>> >>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>> >>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> >>>>>     at
>> >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>>>>     at
>> >>>>>
>> >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>>>>     at
>> >>>>>
>> >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>>>>     at
>> >>>>>
>> >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>>>> Caused by: java.lang.Exception: Deserializing the InputFormat
>> >>>>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>> >>>>>     ... 25 more
>> >>>>> Caused by: java.lang.IllegalStateException: unread block data
>> >>>>>     at
>> >>>>>
>> >>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>> >>>>>     at
>> >>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>> >>>>>     at
>> >>>>>
>> >>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>> >>>>>     at
>> >>>>>
>> >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>> >>>>>     at
>> >>>>>
>> >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> >>>>>     at
>> >>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> >>>>>     at
>> >>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>> >>>>>     at
>> >>>>>
>> >>>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>> >>>>>     ... 26 more
>> >>>>>
>> >>>>> On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa
>> >>>>> <[hidden email]>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Hi,
>> >>>>>>
>> >>>>>> I get a bug when trying to broadcast a list of integers created
>> >>>>>> with
>> >>>>>> the
>> >>>>>> primitive "Arrays.asList(...)".
>> >>>>>>
>> >>>>>> For example, if you try to run this "wordcount" example, you can
>> >>>>>> reproduce the bug.
>> >>>>>>
>> >>>>>>
>> >>>>>> public class WordCountExample {
>> >>>>>>     public static void main(String[] args) throws Exception {
>> >>>>>>         final ExecutionEnvironment env =
>> >>>>>> ExecutionEnvironment.getExecutionEnvironment();
>> >>>>>>
>> >>>>>>     DataSet<String> text = env.fromElements(
>> >>>>>>                 "Who's there?",
>> >>>>>>                 "I think I hear them. Stand, ho! Who's there?");
>> >>>>>>
>> >>>>>>         List<Integer> elements = Arrays.asList(0, 0, 0);
>> >>>>>>
>> >>>>>>         DataSet<TestClass> set = env.fromElements(new
>> >>>>>> TestClass(elements));
>> >>>>>>
>> >>>>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>> >>>>>>                 .flatMap(new LineSplitter())
>> >>>>>>                 .withBroadcastSet(set, "set")
>> >>>>>>                 .groupBy(0)
>> >>>>>>                 .sum(1);
>> >>>>>>
>> >>>>>>         wordCounts.print();
>> >>>>>>     }
>> >>>>>>
>> >>>>>>     public static class LineSplitter implements
>> >>>>>> FlatMapFunction<String,
>> >>>>>> Tuple2<String, Integer>> {
>> >>>>>>         @Override
>> >>>>>>         public void flatMap(String line, Collector<Tuple2<String,
>> >>>>>> Integer>> out) {
>> >>>>>>             for (String word : line.split(" ")) {
>> >>>>>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>> >>>>>>             }
>> >>>>>>         }
>> >>>>>>     }
>> >>>>>>
>> >>>>>>     public static class TestClass implements Serializable {
>> >>>>>>         private static final long serialVersionUID =
>> >>>>>> -2932037991574118651L;
>> >>>>>>
>> >>>>>>         List<Integer> integerList;
>> >>>>>>         public TestClass(List<Integer> integerList){
>> >>>>>>             this.integerList=integerList;
>> >>>>>>         }
>> >>>>>>
>> >>>>>>
>> >>>>>>     }
>> >>>>>> }
>> >>>>>>
>> >>>>>>
>> >>>>>> However, if instead of using the primitive "Arrays.asList(...)", we
>> >>>>>> use
>> >>>>>> instead the ArrayList<> constructor, there is any problem!!!!
>> >>>>>>
>> >>>>>>
>> >>>>>> Regards,
>> >>>>>> Andres
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>
>