Hi,
I'm trying to get a simple example of a source backed by an iterator working. Here's the code I've got: ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<Integer> list = Arrays.asList(1, 2); env.fromCollection(list.iterator(), Integer.class).print(); ``` I get the following exception: ``` Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91 not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766) at braintree.demo.FromIterator.main(FromIterator.java:14) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) ... 11 more ``` This kind of makes sense. The root issue seems to be that the list's iterator is not serializable. In fact, java.util.Iterator doesn't implement Serializable. I can't seem to find any examples of `FromIteratorFunction` being used in the flink codebase. Am I using it wrong? Thanks! Andrew Whitaker | [hidden email] -- Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent. |
you will find some information
regarding this issue in this JIRA:
https://issues.apache.org/jira/browse/FLINK-2608
On 07.04.2016 22:18, Andrew Whitaker wrote:
|
In reply to this post by Andrew Whitaker
hmm, maybe i was to quick with linking
to the JIRA.
As for an example: you can look at the streaming WindowJoin example. The sample data uses an Iterator. (ThrottledIterator) Note that the iterator implementation used is part of flink and also implements serializable. On 07.04.2016 22:18, Andrew Whitaker wrote:
|
Thanks, that example is helpful. It seems like to use `fromCollection` with an iterator it must be an iterator that implements serializable, and Java's built in `Iterator`s don't, unfortunately. On Thu, Apr 7, 2016 at 6:11 PM, Chesnay Schepler <[hidden email]> wrote:
Andrew Whitaker | [hidden email] -- Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent. |
Free forum by Nabble | Edit this page |