FromIteratorFunction problems

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

FromIteratorFunction problems

Andrew Whitaker
Hi,

I'm trying to get a simple example of a source backed by an iterator working. Here's the code I've got:

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

List<Integer> list = Arrays.asList(1, 2);

env.fromCollection(list.iterator(), Integer.class).print();
```

I get the following exception:

```
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91 not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 11 more
```

This kind of makes sense. The root issue seems to be that the list's iterator is not serializable. In fact, java.util.Iterator doesn't implement Serializable.

I can't seem to find any examples of `FromIteratorFunction` being used in the flink codebase. Am I using it wrong?

Thanks!

--
Andrew Whitaker | [hidden email]
--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.
Reply | Threaded
Open this post in threaded view
|

Re: FromIteratorFunction problems

Chesnay Schepler
you will find some information regarding this issue in this JIRA: https://issues.apache.org/jira/browse/FLINK-2608

On 07.04.2016 22:18, Andrew Whitaker wrote:
Hi,

I'm trying to get a simple example of a source backed by an iterator working. Here's the code I've got:

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

List<Integer> list = Arrays.asList(1, 2);

env.fromCollection(list.iterator(), Integer.class).print();
```

I get the following exception:

```
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91 not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 11 more
```

This kind of makes sense. The root issue seems to be that the list's iterator is not serializable. In fact, java.util.Iterator doesn't implement Serializable.

I can't seem to find any examples of `FromIteratorFunction` being used in the flink codebase. Am I using it wrong?

Thanks!

--
Andrew Whitaker | [hidden email]
--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.

Reply | Threaded
Open this post in threaded view
|

Re: FromIteratorFunction problems

Chesnay Schepler
In reply to this post by Andrew Whitaker
hmm, maybe i was to quick with linking to the JIRA.

As for an example: you can look at the streaming WindowJoin example. The sample data uses an Iterator. (ThrottledIterator)
Note that the iterator implementation used is part of flink and also implements serializable.

On 07.04.2016 22:18, Andrew Whitaker wrote:
Hi,

I'm trying to get a simple example of a source backed by an iterator working. Here's the code I've got:

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

List<Integer> list = Arrays.asList(1, 2);

env.fromCollection(list.iterator(), Integer.class).print();
```

I get the following exception:

```
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91 not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 11 more
```

This kind of makes sense. The root issue seems to be that the list's iterator is not serializable. In fact, java.util.Iterator doesn't implement Serializable.

I can't seem to find any examples of `FromIteratorFunction` being used in the flink codebase. Am I using it wrong?

Thanks!

--
Andrew Whitaker | [hidden email]
--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.

Reply | Threaded
Open this post in threaded view
|

Re: FromIteratorFunction problems

Andrew Whitaker
Thanks, that example is helpful. It seems like to use `fromCollection` with an iterator it must be an iterator that implements serializable, and Java's built in `Iterator`s don't, unfortunately.

On Thu, Apr 7, 2016 at 6:11 PM, Chesnay Schepler <[hidden email]> wrote:
hmm, maybe i was to quick with linking to the JIRA.

As for an example: you can look at the streaming WindowJoin example. The sample data uses an Iterator. (ThrottledIterator)
Note that the iterator implementation used is part of flink and also implements serializable.

On 07.04.2016 22:18, Andrew Whitaker wrote:
Hi,

I'm trying to get a simple example of a source backed by an iterator working. Here's the code I've got:

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

List<Integer> list = Arrays.asList(1, 2);

env.fromCollection(list.iterator(), Integer.class).print();
```

I get the following exception:

```
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91 not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 11 more
```

This kind of makes sense. The root issue seems to be that the list's iterator is not serializable. In fact, java.util.Iterator doesn't implement Serializable.

I can't seem to find any examples of `FromIteratorFunction` being used in the flink codebase. Am I using it wrong?

Thanks!

--
Andrew Whitaker | [hidden email]
--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.




--
Andrew Whitaker | [hidden email]
--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.