Hi, I am trying to run this code: public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment .getExecutionEnvironment(); MyTableInputFormat inputFormat = new MyTableInputFormat(); DataSource<Record> dataset = env.createInput(inputFormat); DataSet<Tuple4<StringValue, StringValue, StringValue, BooleanValue>> candidates = dataset .filter(new EmptyEntityFilterFunction()).rebalance() .flatMap(new FindCandidateWithMatchFlagMapFunction<>()); DataSet<Tuple3<StringValue, StringValue, StringValue>> duplicates = candidates .filter(new SingleMatchFilterFunctionWithFlagMatch<>()).map( new MapToTuple3MapFunction<>()); DataSet<Tuple2<StringValue, MyStringList>> duplicatesToprint = duplicates .distinct(0, 1) .groupBy(0) .reduceGroup( new ConsolidateByTypeDuplicatesGroupReduceFunction()); duplicatesToprint.writeAsText("file:///tmp/" + EnsMaintenanceConstants.WORKING_TABLE + "/", WriteMode.OVERWRITE); env.execute(); } Error: java.lang.Exception: Failed to deploy the task CHAIN DataSource (org.okkam.flink.hbase.MyTableInputFormat@4259448a) -> Filter (org.okkam.flink.hbase.EmptyEntityFilterFunction) (1/24) - execution #0 to slot SubSlot 0 (7536af39024636d81e3e05782a701bde (3) - ALLOCATED/ALIVE): java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Invalid shipping strategy for OutputEmitter: PARTITION_FORCED_REBALANCE at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:92) at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:180) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) Caused by: java.lang.IllegalArgumentException: Invalid shipping strategy for OutputEmitter: PARTITION_FORCED_REBALANCE at org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:99) at org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:69) at org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:58) at org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1245) at org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1338) at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:327) at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:90) ... 7 more |
I just had a look at this. Are you using the "Record" data type? That one's tools seem not to support this right now, but it is an easy fix... Am 25.11.2014 09:48 schrieb "Stefano Bortoli" <[hidden email]>:
|
Yes, I am using the record data type. I can move the implementation to the Tuple if that is what is needed. Thanks for the tip! :-)Stefano 2014-11-25 10:29 GMT+01:00 Stephan Ewen <[hidden email]>:
|
Yes because Stefano is working on the stable version..I saw that you are going to release the 7.1 version, do you think you can include also the new HBase addon (that generates Tuples..)?
On Tue, Nov 25, 2014 at 10:31 AM, Stefano Bortoli <[hidden email]> wrote:
|
Yes, makes sense to att the HBase fix to that. On Tue, Nov 25, 2014 at 10:55 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Great :)
On Tue, Nov 25, 2014 at 10:59 AM, Stephan Ewen <[hidden email]> wrote:
|
This problem should be fixed though https://issues.apache.org/jira/browse/FLINK-1278 On Tue, Nov 25, 2014 at 11:02 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |