hi, i read a csv file from disk with flink (java, maven version 8.1) and get the following exception:ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.: DataSink(Print to System.out) (4/4) java.lang.IllegalStateException: Channel received an event before completing the current partial record. at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) at java.lang.Thread.run(Thread.java:745) public class FlinkMain { public static void main(String[] args) { // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //env.setDegreeOfParallelism(1); // get input points DataSet<GeoTimeDataTupel> points = getPointDataSet(env); points.print(); // execute program try { env.execute("KMeans Flink"); } catch (Exception e) { e.printStackTrace(); } } |
my function code: and i use the GDET data from here:private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) { // load properties Properties pro = new Properties(); try { pro.load(new FileInputStream("./resources/config.properties")); } catch (Exception e) { e.printStackTrace(); } String inputFile = pro.getProperty("input"); // map csv file return env.readCsvFile(inputFile) .ignoreInvalidLines() .fieldDelimiter('\u0009') .lineDelimiter("\n") .includeFields(true, true, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false, false, true, true , false, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false) .types(String.class, Long.class, Double.class, Double.class) .map(new TuplePointConverter()); } http://data.gdeltproject.org/events/index.html 2015-05-13 13:09 GMT+02:00 Pa Rö <[hidden email]>:
|
Hi Paul! Thank you for reporting this. This really seems like it should not happen ;-) Is this error reproducable? If yes, we can probably fix it well... Greetings, Stephan On Wed, May 13, 2015 at 1:16 PM, Pa Rö <[hidden email]> wrote:
|
In reply to this post by Pa Rö
Hey Paul! Thanks for reporting the issue. I'm trying to reproduce the problem. I'll post the updates here.
Which version of Flink are you using? You probably meant that you were using Flink 0.8.1 not Maven 8.1, right? ;-) On 13 May 2015, at 13:16, Pa Rö <[hidden email]> wrote: > my function code: > private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) { > // load properties > Properties pro = new Properties(); > try { > pro.load(new FileInputStream("./resources/config.properties")); > } catch (Exception e) { > e.printStackTrace(); > } > String inputFile = pro.getProperty("input"); > // map csv file > return env.readCsvFile(inputFile) > .ignoreInvalidLines() > .fieldDelimiter('\u0009') > .lineDelimiter("\n") > .includeFields(true, true, false, false, false, false, false, false, false, false, false > , false, false, false, false, false, false, false, false, false, false > , false, false, false, false, false, false, false, false, false, false > , false, false, false, false, false, false, false, false, true, true > , false, false, false, false, false, false, false, false, false, false > , false, false, false, false, false, false, false) > .types(String.class, Long.class, Double.class, Double.class) > .map(new TuplePointConverter()); > } > > and i use the GDET data from here: > > http://data.gdeltproject.org/events/index.html > > 2015-05-13 13:09 GMT+02:00 Pa Rö <[hidden email]>: > hi, > > i read a csv file from disk with flink (java, maven version 8.1) and get the following exception: > > ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.: DataSink(Print to System.out) (4/4) > java.lang.IllegalStateException: Channel received an event before completing the current partial record. > at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) > at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) > at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) > at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) > at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) > at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) > at java.lang.Thread.run(Thread.java:745) > > my code: > > public class FlinkMain { > > public static void main(String[] args) { > // set up execution environment > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > //env.setDegreeOfParallelism(1); > // get input points > DataSet<GeoTimeDataTupel> points = getPointDataSet(env); > points.print(); > // execute program > try { > env.execute("KMeans Flink"); > } catch (Exception e) { > e.printStackTrace(); > } > } > > maybe someone have a solution? > > best regards paul > |
In reply to this post by Stephan Ewen
hi stephan, i have found the problem, something was wrong at the read and write function from my data object (implements Writable),now it's work. best regards paul 2015-05-13 13:32 GMT+02:00 Stephan Ewen <[hidden email]>:
|
Ah, that is good to hear. I think we should improve the error message there. On Wed, May 13, 2015 at 2:41 PM, Pa Rö <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |