starting flink job from bash script with maven

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

starting flink job from bash script with maven

Stefano Bortoli-2
Hi guys!

I could program a data maintenance job using Flink on MongoDB. The job runs smoothly if I start it from eclipse. However, when I try to run it using a bash script invoking a maven exec:java I have a serialization exception:
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)': Deserializing the OutputFormat (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) failed: Could not read the user code wrapper: unexpected block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)

attached the complete stack trace. I thought it was a matter of serializable classes, so I have made all my classes serializable.. still I have the same error. Perhaps it is not possible to do these things with Flink.

any intuition? is it doable?

thanks a lot for your support. :-)

saluti,

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: +39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.



flink-bash-start-stacktrace.txt (28K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: starting flink job from bash script with maven

Stephan Ewen
Hi!

The user code object (the output format here) has a corrupt serialization routine.

We use default Java Serialization for these objects. Either the MongoHadoopOutputFormat cannot be serialized and swallows an exception, or it overrides the readObject() / writeObject() methods (from Java Serialization) in an inconsistent way.

To figure that out, can you try whether you can manually serialize the MongoHadoopOutputFormat?

Can you try and call "SerializationUtils.clone(new MongoHadoopOutputFormat)", for example at the beginning of your main method? The SerializationUtils are part of Apache Commons and are probably in your class path anyways.

Stephan


On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <[hidden email]> wrote:
Hi guys!

I could program a data maintenance job using Flink on MongoDB. The job runs smoothly if I start it from eclipse. However, when I try to run it using a bash script invoking a maven exec:java I have a serialization exception:
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)': Deserializing the OutputFormat (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) failed: Could not read the user code wrapper: unexpected block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)

attached the complete stack trace. I thought it was a matter of serializable classes, so I have made all my classes serializable.. still I have the same error. Perhaps it is not possible to do these things with Flink.

any intuition? is it doable?

thanks a lot for your support. :-)

saluti,

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.



Reply | Threaded
Open this post in threaded view
|

Re: starting flink job from bash script with maven

Stefano Bortoli
I have implemented this test without any exception:

package org.tagcloud.persistence.batch.test;

import java.io.IOException;

import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.mapreduce.Job;
import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat;

import com.mongodb.hadoop.MongoOutputFormat;

public class MongoHadoopSerializationTest {

    public static void main(String[] args) {
        Job job;
        try {
            job = Job.getInstance();
            SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), job));
        } catch (IOException e) {
            e.printStackTrace();
        }
       
    }
   
}

2015-07-24 10:01 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

The user code object (the output format here) has a corrupt serialization routine.

We use default Java Serialization for these objects. Either the MongoHadoopOutputFormat cannot be serialized and swallows an exception, or it overrides the readObject() / writeObject() methods (from Java Serialization) in an inconsistent way.

To figure that out, can you try whether you can manually serialize the MongoHadoopOutputFormat?

Can you try and call "SerializationUtils.clone(new MongoHadoopOutputFormat)", for example at the beginning of your main method? The SerializationUtils are part of Apache Commons and are probably in your class path anyways.

Stephan


On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <[hidden email]> wrote:
Hi guys!

I could program a data maintenance job using Flink on MongoDB. The job runs smoothly if I start it from eclipse. However, when I try to run it using a bash script invoking a maven exec:java I have a serialization exception:
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)': Deserializing the OutputFormat (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) failed: Could not read the user code wrapper: unexpected block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)

attached the complete stack trace. I thought it was a matter of serializable classes, so I have made all my classes serializable.. still I have the same error. Perhaps it is not possible to do these things with Flink.

any intuition? is it doable?

thanks a lot for your support. :-)

saluti,

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.




Reply | Threaded
Open this post in threaded view
|

Re: starting flink job from bash script with maven

Stephan Ewen
Hi!

There is probably something going wrong in MongoOutputFormat or MongoHadoop2OutputFormat. Something fails, but Java swallows the problem during Serialization.

It may be a classloading issue that gets not reported. Are the MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar? If not, try putting them in there.

The last check we could to (to validate the Flink Serialization utilities) is the code pasted below. If that does not cause the error, it is probably the issue described above.

Greetings,
Stephan


------------------------------

UserCodeObjectWrapper<Object> userCode = new UserCodeObjectWrapper<Object>(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), Job.getInstance())); 
Configuration cfg = new Configuration();
TaskConfig taskConfig = new TaskConfig(cfg);
taskConfig.setStubWrapper(userCode);
taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader());



On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli <[hidden email]> wrote:
I have implemented this test without any exception:

package org.tagcloud.persistence.batch.test;

import java.io.IOException;

import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.mapreduce.Job;
import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat;

import com.mongodb.hadoop.MongoOutputFormat;

public class MongoHadoopSerializationTest {

    public static void main(String[] args) {
        Job job;
        try {
            job = Job.getInstance();
            SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), job));
        } catch (IOException e) {
            e.printStackTrace();
        }
       
    }
   
}

2015-07-24 10:01 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

The user code object (the output format here) has a corrupt serialization routine.

We use default Java Serialization for these objects. Either the MongoHadoopOutputFormat cannot be serialized and swallows an exception, or it overrides the readObject() / writeObject() methods (from Java Serialization) in an inconsistent way.

To figure that out, can you try whether you can manually serialize the MongoHadoopOutputFormat?

Can you try and call "SerializationUtils.clone(new MongoHadoopOutputFormat)", for example at the beginning of your main method? The SerializationUtils are part of Apache Commons and are probably in your class path anyways.

Stephan


On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <[hidden email]> wrote:
Hi guys!

I could program a data maintenance job using Flink on MongoDB. The job runs smoothly if I start it from eclipse. However, when I try to run it using a bash script invoking a maven exec:java I have a serialization exception:
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)': Deserializing the OutputFormat (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) failed: Could not read the user code wrapper: unexpected block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)

attached the complete stack trace. I thought it was a matter of serializable classes, so I have made all my classes serializable.. still I have the same error. Perhaps it is not possible to do these things with Flink.

any intuition? is it doable?

thanks a lot for your support. :-)

saluti,

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.





Reply | Threaded
Open this post in threaded view
|

Re: starting flink job from bash script with maven

Stefano Bortoli
HI Stephan,

I think I may have found a possible root of the problem. I do not build the fat jar, I simply execute the main with maven exec:java with default install and compile. No uberjar created shading. I will try that and report. The fact that it runs in eclipse so easily makes it confusing somehow.

saluti,
Stefano

2015-07-24 11:09 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

There is probably something going wrong in MongoOutputFormat or MongoHadoop2OutputFormat. Something fails, but Java swallows the problem during Serialization.

It may be a classloading issue that gets not reported. Are the MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar? If not, try putting them in there.

The last check we could to (to validate the Flink Serialization utilities) is the code pasted below. If that does not cause the error, it is probably the issue described above.

Greetings,
Stephan


------------------------------

UserCodeObjectWrapper<Object> userCode = new UserCodeObjectWrapper<Object>(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), Job.getInstance())); 
Configuration cfg = new Configuration();
TaskConfig taskConfig = new TaskConfig(cfg);
taskConfig.setStubWrapper(userCode);
taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader());



On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli <[hidden email]> wrote:
I have implemented this test without any exception:

package org.tagcloud.persistence.batch.test;

import java.io.IOException;

import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.mapreduce.Job;
import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat;

import com.mongodb.hadoop.MongoOutputFormat;

public class MongoHadoopSerializationTest {

    public static void main(String[] args) {
        Job job;
        try {
            job = Job.getInstance();
            SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), job));
        } catch (IOException e) {
            e.printStackTrace();
        }
       
    }
   
}

2015-07-24 10:01 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

The user code object (the output format here) has a corrupt serialization routine.

We use default Java Serialization for these objects. Either the MongoHadoopOutputFormat cannot be serialized and swallows an exception, or it overrides the readObject() / writeObject() methods (from Java Serialization) in an inconsistent way.

To figure that out, can you try whether you can manually serialize the MongoHadoopOutputFormat?

Can you try and call "SerializationUtils.clone(new MongoHadoopOutputFormat)", for example at the beginning of your main method? The SerializationUtils are part of Apache Commons and are probably in your class path anyways.

Stephan


On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <[hidden email]> wrote:
Hi guys!

I could program a data maintenance job using Flink on MongoDB. The job runs smoothly if I start it from eclipse. However, when I try to run it using a bash script invoking a maven exec:java I have a serialization exception:
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)': Deserializing the OutputFormat (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) failed: Could not read the user code wrapper: unexpected block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)

attached the complete stack trace. I thought it was a matter of serializable classes, so I have made all my classes serializable.. still I have the same error. Perhaps it is not possible to do these things with Flink.

any intuition? is it doable?

thanks a lot for your support. :-)

saluti,

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.






Reply | Threaded
Open this post in threaded view
|

Re: starting flink job from bash script with maven

Stefano Bortoli
It seems there is a problem with the maven class loading. I have created the uberjar and then executed with traditional java -cp uberjar.jar args and it worked with no problems. It could be interesting to investigate the reason, as maven exec is very convenient. However, with the uberjar the problems of classpath are eased, so I can live with it.

thanks a lot for your support.

saluti,
Stefano

2015-07-24 11:17 GMT+02:00 Stefano Bortoli <[hidden email]>:
HI Stephan,

I think I may have found a possible root of the problem. I do not build the fat jar, I simply execute the main with maven exec:java with default install and compile. No uberjar created shading. I will try that and report. The fact that it runs in eclipse so easily makes it confusing somehow.

saluti,
Stefano

2015-07-24 11:09 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

There is probably something going wrong in MongoOutputFormat or MongoHadoop2OutputFormat. Something fails, but Java swallows the problem during Serialization.

It may be a classloading issue that gets not reported. Are the MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar? If not, try putting them in there.

The last check we could to (to validate the Flink Serialization utilities) is the code pasted below. If that does not cause the error, it is probably the issue described above.

Greetings,
Stephan


------------------------------

UserCodeObjectWrapper<Object> userCode = new UserCodeObjectWrapper<Object>(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), Job.getInstance())); 
Configuration cfg = new Configuration();
TaskConfig taskConfig = new TaskConfig(cfg);
taskConfig.setStubWrapper(userCode);
taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader());



On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli <[hidden email]> wrote:
I have implemented this test without any exception:

package org.tagcloud.persistence.batch.test;

import java.io.IOException;

import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.mapreduce.Job;
import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat;

import com.mongodb.hadoop.MongoOutputFormat;

public class MongoHadoopSerializationTest {

    public static void main(String[] args) {
        Job job;
        try {
            job = Job.getInstance();
            SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), job));
        } catch (IOException e) {
            e.printStackTrace();
        }
       
    }
   
}

2015-07-24 10:01 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

The user code object (the output format here) has a corrupt serialization routine.

We use default Java Serialization for these objects. Either the MongoHadoopOutputFormat cannot be serialized and swallows an exception, or it overrides the readObject() / writeObject() methods (from Java Serialization) in an inconsistent way.

To figure that out, can you try whether you can manually serialize the MongoHadoopOutputFormat?

Can you try and call "SerializationUtils.clone(new MongoHadoopOutputFormat)", for example at the beginning of your main method? The SerializationUtils are part of Apache Commons and are probably in your class path anyways.

Stephan


On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <[hidden email]> wrote:
Hi guys!

I could program a data maintenance job using Flink on MongoDB. The job runs smoothly if I start it from eclipse. However, when I try to run it using a bash script invoking a maven exec:java I have a serialization exception:
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)': Deserializing the OutputFormat (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) failed: Could not read the user code wrapper: unexpected block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)

attached the complete stack trace. I thought it was a matter of serializable classes, so I have made all my classes serializable.. still I have the same error. Perhaps it is not possible to do these things with Flink.

any intuition? is it doable?

thanks a lot for your support. :-)

saluti,

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.







Reply | Threaded
Open this post in threaded view
|

Re: starting flink job from bash script with maven

Stephan Ewen
Thanks for letting us know!

The problem with Java Serialization is that they often swallow exceptions and you only see a "corrupted byte stream" in the end. So far, I have found no workaround for that.

Stephan


On Fri, Jul 24, 2015 at 11:31 AM, Stefano Bortoli <[hidden email]> wrote:
It seems there is a problem with the maven class loading. I have created the uberjar and then executed with traditional java -cp uberjar.jar args and it worked with no problems. It could be interesting to investigate the reason, as maven exec is very convenient. However, with the uberjar the problems of classpath are eased, so I can live with it.

thanks a lot for your support.

saluti,
Stefano

2015-07-24 11:17 GMT+02:00 Stefano Bortoli <[hidden email]>:
HI Stephan,

I think I may have found a possible root of the problem. I do not build the fat jar, I simply execute the main with maven exec:java with default install and compile. No uberjar created shading. I will try that and report. The fact that it runs in eclipse so easily makes it confusing somehow.

saluti,
Stefano

2015-07-24 11:09 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

There is probably something going wrong in MongoOutputFormat or MongoHadoop2OutputFormat. Something fails, but Java swallows the problem during Serialization.

It may be a classloading issue that gets not reported. Are the MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar? If not, try putting them in there.

The last check we could to (to validate the Flink Serialization utilities) is the code pasted below. If that does not cause the error, it is probably the issue described above.

Greetings,
Stephan


------------------------------

UserCodeObjectWrapper<Object> userCode = new UserCodeObjectWrapper<Object>(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), Job.getInstance())); 
Configuration cfg = new Configuration();
TaskConfig taskConfig = new TaskConfig(cfg);
taskConfig.setStubWrapper(userCode);
taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader());



On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli <[hidden email]> wrote:
I have implemented this test without any exception:

package org.tagcloud.persistence.batch.test;

import java.io.IOException;

import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.mapreduce.Job;
import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat;

import com.mongodb.hadoop.MongoOutputFormat;

public class MongoHadoopSerializationTest {

    public static void main(String[] args) {
        Job job;
        try {
            job = Job.getInstance();
            SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new MongoOutputFormat<>(), job));
        } catch (IOException e) {
            e.printStackTrace();
        }
       
    }
   
}

2015-07-24 10:01 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

The user code object (the output format here) has a corrupt serialization routine.

We use default Java Serialization for these objects. Either the MongoHadoopOutputFormat cannot be serialized and swallows an exception, or it overrides the readObject() / writeObject() methods (from Java Serialization) in an inconsistent way.

To figure that out, can you try whether you can manually serialize the MongoHadoopOutputFormat?

Can you try and call "SerializationUtils.clone(new MongoHadoopOutputFormat)", for example at the beginning of your main method? The SerializationUtils are part of Apache Commons and are probably in your class path anyways.

Stephan


On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <[hidden email]> wrote:
Hi guys!

I could program a data maintenance job using Flink on MongoDB. The job runs smoothly if I start it from eclipse. However, when I try to run it using a bash script invoking a maven exec:java I have a serialization exception:
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)': Deserializing the OutputFormat (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) failed: Could not read the user code wrapper: unexpected block data
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)

attached the complete stack trace. I thought it was a matter of serializable classes, so I have made all my classes serializable.. still I have the same error. Perhaps it is not possible to do these things with Flink.

any intuition? is it doable?

thanks a lot for your support. :-)

saluti,

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.