Fat jar fails deployment (streaming job too large)

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Fat jar fails deployment (streaming job too large)

Niels
Hi All,

We've been using Flink 1.3.2 for a while now, but recently failed to deploy
our fat jar to the cluster. The deployment only works when we remove 2
arbitrary operators, thus giving us the impression our job is too large.
However, we only changed some case classes and serializers (to support Avro)
compared to a working version of our jar. I'll provide some context below.

*Streaming operators used: *(same list as when deploy worked)
- 9 Incoming streams from Kafka (all parsed from JSON -> Case Classes)
- 6 Stateful Joins (extend CoProcessFunction)
- 4 Stateful Processors (extend ProcessFunction)
- 5 Maps
- 2 Filters
- ‎1 Union of 3 Streams
- 1 Sink to Kafka (Case class -> JSON)

*Changes made:*
- add extended Type Serializer for Avro support
- add companion objects to case classes for translation to Avro Generic
Records
- alter state full functions to use above changes

*what does work:*
- remove 2 arbitrary operators and deploy fat jar
- ‎run full program using sbt run locally

Could it be that somehow the complexity causes the job deploy as jar to
fail? We simply get a timeout from Flinks CLI when trying to deploy, even
when extending the timeout to several minutes.

Any help would be very much appreciated!

Thanks,
Niels



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Fat jar fails deployment (streaming job too large)

Fabian Hueske-2
Hi Niels,

There should be no size constraints on the complexity of an application or the size of a JAR file.
The problem that you describe sounds a bit strange and should be fixed.

Apparently, it has to spend more time on planning / submitting the application than before.
Have you tried to increase the akka.client.timeout parameter?

If that does not help, it would be good to learn what the JobManager is doing after the application was submitted.
Either it just takes longer than before such that the client timeout is exceeded or it might even get stuck in some kind of deadlock (which would be bad).
In that case it might help to take a few stacktraces of JM process after the application was submitted to check if the threads are making progress.

I'll also include Till who is more familiar with the submission process and JM planning and coordination.

Best, Fabian


2018-02-27 9:31 GMT+01:00 Niels <[hidden email]>:
Hi All,

We've been using Flink 1.3.2 for a while now, but recently failed to deploy
our fat jar to the cluster. The deployment only works when we remove 2
arbitrary operators, thus giving us the impression our job is too large.
However, we only changed some case classes and serializers (to support Avro)
compared to a working version of our jar. I'll provide some context below.

*Streaming operators used: *(same list as when deploy worked)
- 9 Incoming streams from Kafka (all parsed from JSON -> Case Classes)
- 6 Stateful Joins (extend CoProcessFunction)
- 4 Stateful Processors (extend ProcessFunction)
- 5 Maps
- 2 Filters
- ‎1 Union of 3 Streams
- 1 Sink to Kafka (Case class -> JSON)

*Changes made:*
- add extended Type Serializer for Avro support
- add companion objects to case classes for translation to Avro Generic
Records
- alter state full functions to use above changes

*what does work:*
- remove 2 arbitrary operators and deploy fat jar
- ‎run full program using sbt run locally

Could it be that somehow the complexity causes the job deploy as jar to
fail? We simply get a timeout from Flinks CLI when trying to deploy, even
when extending the timeout to several minutes.

Any help would be very much appreciated!

Thanks,
Niels



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Fat jar fails deployment (streaming job too large)

Till Rohrmann
Hi Niels,

the size of the jar does not play a role for Flink. What could be a problem is that the serialized `JobGraph` (user code with closures) is larger than 10 MB and, thus, exceeds the maximum default framesize of Akka. In such a case, it cannot be sent to the `JobMaster`. You can control the framesize via `akka.framesize`.

In order to debug the problem properly, I would need access to the client log and the JobManager logs if possible.

Cheers,
Till

On Tue, Feb 27, 2018 at 11:05 AM, Fabian Hueske <[hidden email]> wrote:
Hi Niels,

There should be no size constraints on the complexity of an application or the size of a JAR file.
The problem that you describe sounds a bit strange and should be fixed.

Apparently, it has to spend more time on planning / submitting the application than before.
Have you tried to increase the akka.client.timeout parameter?

If that does not help, it would be good to learn what the JobManager is doing after the application was submitted.
Either it just takes longer than before such that the client timeout is exceeded or it might even get stuck in some kind of deadlock (which would be bad).
In that case it might help to take a few stacktraces of JM process after the application was submitted to check if the threads are making progress.

I'll also include Till who is more familiar with the submission process and JM planning and coordination.

Best, Fabian


2018-02-27 9:31 GMT+01:00 Niels <[hidden email]>:
Hi All,

We've been using Flink 1.3.2 for a while now, but recently failed to deploy
our fat jar to the cluster. The deployment only works when we remove 2
arbitrary operators, thus giving us the impression our job is too large.
However, we only changed some case classes and serializers (to support Avro)
compared to a working version of our jar. I'll provide some context below.

*Streaming operators used: *(same list as when deploy worked)
- 9 Incoming streams from Kafka (all parsed from JSON -> Case Classes)
- 6 Stateful Joins (extend CoProcessFunction)
- 4 Stateful Processors (extend ProcessFunction)
- 5 Maps
- 2 Filters
- ‎1 Union of 3 Streams
- 1 Sink to Kafka (Case class -> JSON)

*Changes made:*
- add extended Type Serializer for Avro support
- add companion objects to case classes for translation to Avro Generic
Records
- alter state full functions to use above changes

*what does work:*
- remove 2 arbitrary operators and deploy fat jar
- ‎run full program using sbt run locally

Could it be that somehow the complexity causes the job deploy as jar to
fail? We simply get a timeout from Flinks CLI when trying to deploy, even
when extending the timeout to several minutes.

Any help would be very much appreciated!

Thanks,
Niels



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Fat jar fails deployment (streaming job too large)

Niels
Hi Till,

I've just tried to set on the *client*:
akka.client.timeout: 300s

On the *cluster*:
akka.ask.timeout: 30s
akka.lookup.timeout: 30s
akka.client.timeout: 300s
akka.framesize: 104857600b #(10x the original of 10MB)
akka.log.lifecycle.events: true

Still gives me the same issue, the fat jar isn't deployed. See the attached
files for the logs of the jobmanager and the deployer. Let me know if I can
provide you with any additional info. Thanks for your help!

Cheers,
Niels

Flink_deploy_log.txt
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1147/Flink_deploy_log.txt>  
flink_jobmanager_log.txt
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1147/flink_jobmanager_log.txt>  





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Fat jar fails deployment (streaming job too large)

Niels
In case it's useful I've found how to enable bit more debug logging on the
jobmanager:

flink_jobmanager_log.txt
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1147/flink_jobmanager_log.txt>  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

RE: Fat jar fails deployment (streaming job too large)

Chan, Regina
In reply to this post by Niels
Any updates on this one? I'm seeing similar issues with 1.3.3 and the batch api.

Main difference is that I have even more operators ~850, mostly maps and filters with one cogroup. I don't really want to set a akka.client.timeout for anything more than 10 minutes seeing that it still fails with that amount. The akka.framesize is already 500Mb...

akka.framesize: 524288000b
akka.ask.timeout: 10min
akka.client.timeout: 10min
akka.lookup.timeout: 10min


Thanks,
Regina



-----Original Message-----
From: Niels [mailto:[hidden email]]
Sent: Tuesday, February 27, 2018 11:40 AM
To: [hidden email]
Subject: Re: Fat jar fails deployment (streaming job too large)

Hi Till,

I've just tried to set on the *client*:
akka.client.timeout: 300s

On the *cluster*:
akka.ask.timeout: 30s
akka.lookup.timeout: 30s
akka.client.timeout: 300s
akka.framesize: 104857600b #(10x the original of 10MB)
akka.log.lifecycle.events: true

Still gives me the same issue, the fat jar isn't deployed. See the attached
files for the logs of the jobmanager and the deployer. Let me know if I can
provide you with any additional info. Thanks for your help!

Cheers,
Niels

Flink_deploy_log.txt
<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_file_t1147_Flink-5Fdeploy-5Flog.txt&d=DwICAg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=p4nMsVlOWZXkIxtRMVt11ovf0gctuHFZJfzvDgpvyKk&s=HxWMISxclHHjDET_E_zY-P95lt5mvMxU7YfGx9vyFcg&e= >  
flink_jobmanager_log.txt
<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_file_t1147_flink-5Fjobmanager-5Flog.txt&d=DwICAg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=p4nMsVlOWZXkIxtRMVt11ovf0gctuHFZJfzvDgpvyKk&s=8PvIcLRPFokJ5XOPsczSatUddfM-xd6eG_FxaDlHEBk&e= >  





--
Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=p4nMsVlOWZXkIxtRMVt11ovf0gctuHFZJfzvDgpvyKk&s=yX4z6UV1AFsAQtJsVquzujhFio0CgYr-tAIoroUXP8E&e= 
Reply | Threaded
Open this post in threaded view
|

Re: Fat jar fails deployment (streaming job too large)

Piotr Nowojski
Short answer: could be that your job is simply too big to be serialised, distributed and deserialised in the given time and you would have to increase timeouts even more.

Long answer: 

Do you have the same problem when you try to submit smaller job? Does your cluster work for simpler jobs? Try cutting down/simplifying your job up to the point it works. Maybe you will be able to pin down one single operator that’s causing the problem (one that have for example huge static data structure). If so, you might be able to optimise your operators in some way. Maybe some operator is doing some weird things and causing problems.

You could also try to approach this problem from other direction (as previously suggested by Fabian). Try to profile/find out what the cluster is doing, where is the problem. Job Manager? One Task Manager? All of the Task Managers? Is there high cpu usage somewhere? Maybe one thread somewhere is overloaded? High network usage? After identifying potential problematic JVM’s, you could attach a code profiler or print stack traces to further pin down the problem. 

Piotrek

On 30 Apr 2018, at 21:53, Chan, Regina <[hidden email]> wrote:

Any updates on this one? I'm seeing similar issues with 1.3.3 and the batch api. 

Main difference is that I have even more operators ~850, mostly maps and filters with one cogroup. I don't really want to set a akka.client.timeout for anything more than 10 minutes seeing that it still fails with that amount. The akka.framesize is already 500Mb... 

akka.framesize: 524288000b
akka.ask.timeout: 10min
akka.client.timeout: 10min
akka.lookup.timeout: 10min


Thanks,
Regina



-----Original Message-----
From: Niels [[hidden email]] 
Sent: Tuesday, February 27, 2018 11:40 AM
To: [hidden email]
Subject: Re: Fat jar fails deployment (streaming job too large)

Hi Till,

I've just tried to set on the *client*:
akka.client.timeout: 300s 

On the *cluster*:
akka.ask.timeout: 30s
akka.lookup.timeout: 30s
akka.client.timeout: 300s
akka.framesize: 104857600b #(10x the original of 10MB)
akka.log.lifecycle.events: true

Still gives me the same issue, the fat jar isn't deployed. See the attached
files for the logs of the jobmanager and the deployer. Let me know if I can
provide you with any additional info. Thanks for your help!

Cheers,
Niels

Flink_deploy_log.txt
<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_file_t1147_Flink-5Fdeploy-5Flog.txt&d=DwICAg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=p4nMsVlOWZXkIxtRMVt11ovf0gctuHFZJfzvDgpvyKk&s=HxWMISxclHHjDET_E_zY-P95lt5mvMxU7YfGx9vyFcg&e= >  
flink_jobmanager_log.txt
<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_file_t1147_flink-5Fjobmanager-5Flog.txt&d=DwICAg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=p4nMsVlOWZXkIxtRMVt11ovf0gctuHFZJfzvDgpvyKk&s=8PvIcLRPFokJ5XOPsczSatUddfM-xd6eG_FxaDlHEBk&e= >  





--
Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=p4nMsVlOWZXkIxtRMVt11ovf0gctuHFZJfzvDgpvyKk&s=yX4z6UV1AFsAQtJsVquzujhFio0CgYr-tAIoroUXP8E&e=