Hi guys! I could program a data maintenance job using Flink on MongoDB. The job runs smoothly if I start it from eclipse. However, when I try to run it using a bash script invoking a maven exec:java I have a serialization exception:org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)': Deserializing the OutputFormat (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) failed: Could not read the user code wrapper: unexpected block data at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) attached the complete stack trace. I thought it was a matter of serializable classes, so I have made all my classes serializable.. still I have the same error. Perhaps it is not possible to do these things with Flink. any intuition? is it doable? thanks a lot for your support. :-) saluti, Stefano Bortoli, PhD Email: [hidden email] Phone nr: +39 0461 1823912 Headquarters: Trento (Italy), Via Trener 8 flink-bash-start-stacktrace.txt (28K) Download Attachment |
Hi! The user code object (the output format here) has a corrupt serialization routine. We use default Java Serialization for these objects. Either the MongoHadoopOutputFormat cannot be serialized and swallows an exception, or it overrides the readObject() / writeObject() methods (from Java Serialization) in an inconsistent way. To figure that out, can you try whether you can manually serialize the MongoHadoopOutputFormat? Can you try and call "SerializationUtils.clone(new MongoHadoopOutputFormat)", for example at the beginning of your main method? The SerializationUtils are part of Apache Commons and are probably in your class path anyways. Stephan On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <[hidden email]> wrote:
|
I have implemented this test without any exception: package org.tagcloud.persistence.batch.test; import java.io.IOException; import org.apache.commons.lang.SerializationUtils; import org.apache.hadoop.mapreduce.Job; import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat; import com.mongodb.hadoop.MongoOutputFormat; public class MongoHadoopSerializationTest { public static void main(String[] args) { Job job; try { job = Job.getInstance(); SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), job)); } catch (IOException e) { e.printStackTrace(); } } } 2015-07-24 10:01 GMT+02:00 Stephan Ewen <[hidden email]>:
|
Hi! There is probably something going wrong in MongoOutputFormat or MongoHadoop2OutputFormat. Something fails, but Java swallows the problem during Serialization. It may be a classloading issue that gets not reported. Are the MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar? If not, try putting them in there. The last check we could to (to validate the Flink Serialization utilities) is the code pasted below. If that does not cause the error, it is probably the issue described above. Greetings, Stephan ------------------------------ UserCodeObjectWrapper<Object> userCode = new UserCodeObjectWrapper<Object>(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), Job.getInstance())); Configuration cfg = new Configuration(); TaskConfig taskConfig = new TaskConfig(cfg); taskConfig.setStubWrapper(userCode); taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader()); On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli <[hidden email]> wrote:
|
HI Stephan, I think I may have found a possible root of the problem. I do not build the fat jar, I simply execute the main with maven exec:java with default install and compile. No uberjar created shading. I will try that and report. The fact that it runs in eclipse so easily makes it confusing somehow.2015-07-24 11:09 GMT+02:00 Stephan Ewen <[hidden email]>:
|
It seems there is a problem with the maven class loading. I have created the uberjar and then executed with traditional java -cp uberjar.jar args and it worked with no problems. It could be interesting to investigate the reason, as maven exec is very convenient. However, with the uberjar the problems of classpath are eased, so I can live with it. thanks a lot for your support.2015-07-24 11:17 GMT+02:00 Stefano Bortoli <[hidden email]>:
|
Thanks for letting us know! The problem with Java Serialization is that they often swallow exceptions and you only see a "corrupted byte stream" in the end. So far, I have found no workaround for that. Stephan On Fri, Jul 24, 2015 at 11:31 AM, Stefano Bortoli <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |