Different results on local and on cluster

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Different results on local and on cluster

Flavio Pompermaier
Hi to all,
I have a Flink job that computes data correctly when launched locally from my IDE while it doesn't when launched on the cluster.

Is there any suggestion/example to understand the problematic operators in this way?
I think the root cause is the fact that some operator (e.g. coGroup/groupBy,etc), which I assume to have all the data for a key, maybe it is not (because the data is partitioned among nodes).

Any help is appreciated,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Different results on local and on cluster

Aljoscha Krettek
Hi,
do you have any data in the coGroup/groupBy operators that you use, besides the input data?

Cheers,
Aljoscha

On Fri, 1 Jul 2016 at 14:17 Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I have a Flink job that computes data correctly when launched locally from my IDE while it doesn't when launched on the cluster.

Is there any suggestion/example to understand the problematic operators in this way?
I think the root cause is the fact that some operator (e.g. coGroup/groupBy,etc), which I assume to have all the data for a key, maybe it is not (because the data is partitioned among nodes).

Any help is appreciated,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Different results on local and on cluster

Flavio Pompermaier

what do you mean exactly?

On 1 Jul 2016 18:58, "Aljoscha Krettek" <[hidden email]> wrote:
Hi,
do you have any data in the coGroup/groupBy operators that you use, besides the input data?

Cheers,
Aljoscha

On Fri, 1 Jul 2016 at 14:17 Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I have a Flink job that computes data correctly when launched locally from my IDE while it doesn't when launched on the cluster.

Is there any suggestion/example to understand the problematic operators in this way?
I think the root cause is the fact that some operator (e.g. coGroup/groupBy,etc), which I assume to have all the data for a key, maybe it is not (because the data is partitioned among nodes).

Any help is appreciated,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Different results on local and on cluster

Ufuk Celebi
I guess Aljoscha was referring to whether you also have broadcasted
input or something like it?

On Fri, Jul 1, 2016 at 7:05 PM, Flavio Pompermaier <[hidden email]> wrote:

> what do you mean exactly?
>
> On 1 Jul 2016 18:58, "Aljoscha Krettek" <[hidden email]> wrote:
>>
>> Hi,
>> do you have any data in the coGroup/groupBy operators that you use,
>> besides the input data?
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 1 Jul 2016 at 14:17 Flavio Pompermaier <[hidden email]>
>> wrote:
>>>
>>> Hi to all,
>>> I have a Flink job that computes data correctly when launched locally
>>> from my IDE while it doesn't when launched on the cluster.
>>>
>>> Is there any suggestion/example to understand the problematic operators
>>> in this way?
>>> I think the root cause is the fact that some operator (e.g.
>>> coGroup/groupBy,etc), which I assume to have all the data for a key, maybe
>>> it is not (because the data is partitioned among nodes).
>>>
>>> Any help is appreciated,
>>> Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Different results on local and on cluster

Flavio Pompermaier

No, I haven't.
I fear that unkilled taskmanger could have been the cause of this problem.
Last day I run the job and I discovered that on some node there was some zombie taskmanger yhat wasn't terminated during the stop-cluster.
What do you think?What happens in this situations?old taskmanager are still avle to interfer with the new jobmanager?
in the webdashboard I didn't  see them so I thought it wasn't  problematic at all so I just killed them..

On 4 Jul 2016 12:07 p.m., "Ufuk Celebi" <[hidden email]> wrote:
I guess Aljoscha was referring to whether you also have broadcasted
input or something like it?

On Fri, Jul 1, 2016 at 7:05 PM, Flavio Pompermaier <[hidden email]> wrote:
> what do you mean exactly?
>
> On 1 Jul 2016 18:58, "Aljoscha Krettek" <[hidden email]> wrote:
>>
>> Hi,
>> do you have any data in the coGroup/groupBy operators that you use,
>> besides the input data?
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 1 Jul 2016 at 14:17 Flavio Pompermaier <[hidden email]>
>> wrote:
>>>
>>> Hi to all,
>>> I have a Flink job that computes data correctly when launched locally
>>> from my IDE while it doesn't when launched on the cluster.
>>>
>>> Is there any suggestion/example to understand the problematic operators
>>> in this way?
>>> I think the root cause is the fact that some operator (e.g.
>>> coGroup/groupBy,etc), which I assume to have all the data for a key, maybe
>>> it is not (because the data is partitioned among nodes).
>>>
>>> Any help is appreciated,
>>> Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Different results on local and on cluster

Ufuk Celebi
It's not possible to tell. You would have to look into the logs of the
job manager to check what happened. The not killed task manager could
have re-connected to the job manager, if it was restarted quickly
after the failure. Why do you think that the task manager would
influence the job result though?

On Mon, Jul 4, 2016 at 12:23 PM, Flavio Pompermaier
<[hidden email]> wrote:

> No, I haven't.
> I fear that unkilled taskmanger could have been the cause of this problem.
> Last day I run the job and I discovered that on some node there was some
> zombie taskmanger yhat wasn't terminated during the stop-cluster.
> What do you think?What happens in this situations?old taskmanager are still
> avle to interfer with the new jobmanager?
> in the webdashboard I didn't  see them so I thought it wasn't  problematic
> at all so I just killed them..
>
> On 4 Jul 2016 12:07 p.m., "Ufuk Celebi" <[hidden email]> wrote:
>
> I guess Aljoscha was referring to whether you also have broadcasted
> input or something like it?
>
> On Fri, Jul 1, 2016 at 7:05 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>> what do you mean exactly?
>>
>> On 1 Jul 2016 18:58, "Aljoscha Krettek" <[hidden email]> wrote:
>>>
>>> Hi,
>>> do you have any data in the coGroup/groupBy operators that you use,
>>> besides the input data?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 1 Jul 2016 at 14:17 Flavio Pompermaier <[hidden email]>
>>> wrote:
>>>>
>>>> Hi to all,
>>>> I have a Flink job that computes data correctly when launched locally
>>>> from my IDE while it doesn't when launched on the cluster.
>>>>
>>>> Is there any suggestion/example to understand the problematic operators
>>>> in this way?
>>>> I think the root cause is the fact that some operator (e.g.
>>>> coGroup/groupBy,etc), which I assume to have all the data for a key,
>>>> maybe
>>>> it is not (because the data is partitioned among nodes).
>>>>
>>>> Any help is appreciated,
>>>> Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Different results on local and on cluster

Flavio Pompermaier

Because I don't  see any good reason for that...maybe  also all keyo serialization errors that  I have from time to time could be symptomatic of some other error in how Flink manage the ibternal buffers...but also this is just another personal guess I did..

On 4 Jul 2016 12:29 p.m., "Ufuk Celebi" <[hidden email]> wrote:
It's not possible to tell. You would have to look into the logs of the
job manager to check what happened. The not killed task manager could
have re-connected to the job manager, if it was restarted quickly
after the failure. Why do you think that the task manager would
influence the job result though?

On Mon, Jul 4, 2016 at 12:23 PM, Flavio Pompermaier
<[hidden email]> wrote:
> No, I haven't.
> I fear that unkilled taskmanger could have been the cause of this problem.
> Last day I run the job and I discovered that on some node there was some
> zombie taskmanger yhat wasn't terminated during the stop-cluster.
> What do you think?What happens in this situations?old taskmanager are still
> avle to interfer with the new jobmanager?
> in the webdashboard I didn't  see them so I thought it wasn't  problematic
> at all so I just killed them..
>
> On 4 Jul 2016 12:07 p.m., "Ufuk Celebi" <[hidden email]> wrote:
>
> I guess Aljoscha was referring to whether you also have broadcasted
> input or something like it?
>
> On Fri, Jul 1, 2016 at 7:05 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>> what do you mean exactly?
>>
>> On 1 Jul 2016 18:58, "Aljoscha Krettek" <[hidden email]> wrote:
>>>
>>> Hi,
>>> do you have any data in the coGroup/groupBy operators that you use,
>>> besides the input data?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 1 Jul 2016 at 14:17 Flavio Pompermaier <[hidden email]>
>>> wrote:
>>>>
>>>> Hi to all,
>>>> I have a Flink job that computes data correctly when launched locally
>>>> from my IDE while it doesn't when launched on the cluster.
>>>>
>>>> Is there any suggestion/example to understand the problematic operators
>>>> in this way?
>>>> I think the root cause is the fact that some operator (e.g.
>>>> coGroup/groupBy,etc), which I assume to have all the data for a key,
>>>> maybe
>>>> it is not (because the data is partitioned among nodes).
>>>>
>>>> Any help is appreciated,
>>>> Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Different results on local and on cluster

Flavio Pompermaier

Sorry I wanted to write Kryo but I'm on my mobile....

On 4 Jul 2016 12:34 p.m., "Flavio Pompermaier" <[hidden email]> wrote:

Because I don't  see any good reason for that...maybe  also all keyo serialization errors that  I have from time to time could be symptomatic of some other error in how Flink manage the ibternal buffers...but also this is just another personal guess I did..

On 4 Jul 2016 12:29 p.m., "Ufuk Celebi" <[hidden email]> wrote:
It's not possible to tell. You would have to look into the logs of the
job manager to check what happened. The not killed task manager could
have re-connected to the job manager, if it was restarted quickly
after the failure. Why do you think that the task manager would
influence the job result though?

On Mon, Jul 4, 2016 at 12:23 PM, Flavio Pompermaier
<[hidden email]> wrote:
> No, I haven't.
> I fear that unkilled taskmanger could have been the cause of this problem.
> Last day I run the job and I discovered that on some node there was some
> zombie taskmanger yhat wasn't terminated during the stop-cluster.
> What do you think?What happens in this situations?old taskmanager are still
> avle to interfer with the new jobmanager?
> in the webdashboard I didn't  see them so I thought it wasn't  problematic
> at all so I just killed them..
>
> On 4 Jul 2016 12:07 p.m., "Ufuk Celebi" <[hidden email]> wrote:
>
> I guess Aljoscha was referring to whether you also have broadcasted
> input or something like it?
>
> On Fri, Jul 1, 2016 at 7:05 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>> what do you mean exactly?
>>
>> On 1 Jul 2016 18:58, "Aljoscha Krettek" <[hidden email]> wrote:
>>>
>>> Hi,
>>> do you have any data in the coGroup/groupBy operators that you use,
>>> besides the input data?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 1 Jul 2016 at 14:17 Flavio Pompermaier <[hidden email]>
>>> wrote:
>>>>
>>>> Hi to all,
>>>> I have a Flink job that computes data correctly when launched locally
>>>> from my IDE while it doesn't when launched on the cluster.
>>>>
>>>> Is there any suggestion/example to understand the problematic operators
>>>> in this way?
>>>> I think the root cause is the fact that some operator (e.g.
>>>> coGroup/groupBy,etc), which I assume to have all the data for a key,
>>>> maybe
>>>> it is not (because the data is partitioned among nodes).
>>>>
>>>> Any help is appreciated,
>>>> Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Different results on local and on cluster

Flavio Pompermaier
Hi to all,
I forgot to close this thread. In the end the error was (fortunately) in my code, since I use the "reuse strategy" and in one case I forgot to reset the field of a POJO I was filling in a map function. So, every time I was running the job the error was in a different output object.

Thanks to all,
Flavio

On Mon, Jul 4, 2016 at 12:34 PM, Flavio Pompermaier <[hidden email]> wrote:

Sorry I wanted to write Kryo but I'm on my mobile....

On 4 Jul 2016 12:34 p.m., "Flavio Pompermaier" <[hidden email]> wrote:

Because I don't  see any good reason for that...maybe  also all keyo serialization errors that  I have from time to time could be symptomatic of some other error in how Flink manage the ibternal buffers...but also this is just another personal guess I did..

On 4 Jul 2016 12:29 p.m., "Ufuk Celebi" <[hidden email]> wrote:
It's not possible to tell. You would have to look into the logs of the
job manager to check what happened. The not killed task manager could
have re-connected to the job manager, if it was restarted quickly
after the failure. Why do you think that the task manager would
influence the job result though?

On Mon, Jul 4, 2016 at 12:23 PM, Flavio Pompermaier
<[hidden email]> wrote:
> No, I haven't.
> I fear that unkilled taskmanger could have been the cause of this problem.
> Last day I run the job and I discovered that on some node there was some
> zombie taskmanger yhat wasn't terminated during the stop-cluster.
> What do you think?What happens in this situations?old taskmanager are still
> avle to interfer with the new jobmanager?
> in the webdashboard I didn't  see them so I thought it wasn't  problematic
> at all so I just killed them..
>
> On 4 Jul 2016 12:07 p.m., "Ufuk Celebi" <[hidden email]> wrote:
>
> I guess Aljoscha was referring to whether you also have broadcasted
> input or something like it?
>
> On Fri, Jul 1, 2016 at 7:05 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>> what do you mean exactly?
>>
>> On 1 Jul 2016 18:58, "Aljoscha Krettek" <[hidden email]> wrote:
>>>
>>> Hi,
>>> do you have any data in the coGroup/groupBy operators that you use,
>>> besides the input data?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 1 Jul 2016 at 14:17 Flavio Pompermaier <[hidden email]>
>>> wrote:
>>>>
>>>> Hi to all,
>>>> I have a Flink job that computes data correctly when launched locally
>>>> from my IDE while it doesn't when launched on the cluster.
>>>>
>>>> Is there any suggestion/example to understand the problematic operators
>>>> in this way?
>>>> I think the root cause is the fact that some operator (e.g.
>>>> coGroup/groupBy,etc), which I assume to have all the data for a key,
>>>> maybe
>>>> it is not (because the data is partitioned among nodes).
>>>>
>>>> Any help is appreciated,
>>>> Flavio



--

Flavio Pompermaier
Development Department
_______________________________________________
OKKAMSrl www.okkam.it

Phone: +(39) 0461 283 702
Fax: + (39) 0461 186 6433
Email: [hidden email]
Headquarters: Trento (Italy), via G.B. Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.

Reply | Threaded
Open this post in threaded view
|

Re: Different results on local and on cluster

Ufuk Celebi
Thanks for reporting back!

On Mon, Jul 18, 2016 at 10:13 AM, Flavio Pompermaier
<[hidden email]> wrote:

> Hi to all,
> I forgot to close this thread. In the end the error was (fortunately) in my
> code, since I use the "reuse strategy" and in one case I forgot to reset the
> field of a POJO I was filling in a map function. So, every time I was
> running the job the error was in a different output object.
>
> Thanks to all,
> Flavio
>
> On Mon, Jul 4, 2016 at 12:34 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>>
>> Sorry I wanted to write Kryo but I'm on my mobile....
>>
>> On 4 Jul 2016 12:34 p.m., "Flavio Pompermaier" <[hidden email]>
>> wrote:
>>>
>>> Because I don't  see any good reason for that...maybe  also all keyo
>>> serialization errors that  I have from time to time could be symptomatic of
>>> some other error in how Flink manage the ibternal buffers...but also this is
>>> just another personal guess I did..
>>>
>>> On 4 Jul 2016 12:29 p.m., "Ufuk Celebi" <[hidden email]> wrote:
>>>>
>>>> It's not possible to tell. You would have to look into the logs of the
>>>> job manager to check what happened. The not killed task manager could
>>>> have re-connected to the job manager, if it was restarted quickly
>>>> after the failure. Why do you think that the task manager would
>>>> influence the job result though?
>>>>
>>>> On Mon, Jul 4, 2016 at 12:23 PM, Flavio Pompermaier
>>>> <[hidden email]> wrote:
>>>> > No, I haven't.
>>>> > I fear that unkilled taskmanger could have been the cause of this
>>>> > problem.
>>>> > Last day I run the job and I discovered that on some node there was
>>>> > some
>>>> > zombie taskmanger yhat wasn't terminated during the stop-cluster.
>>>> > What do you think?What happens in this situations?old taskmanager are
>>>> > still
>>>> > avle to interfer with the new jobmanager?
>>>> > in the webdashboard I didn't  see them so I thought it wasn't
>>>> > problematic
>>>> > at all so I just killed them..
>>>> >
>>>> > On 4 Jul 2016 12:07 p.m., "Ufuk Celebi" <[hidden email]> wrote:
>>>> >
>>>> > I guess Aljoscha was referring to whether you also have broadcasted
>>>> > input or something like it?
>>>> >
>>>> > On Fri, Jul 1, 2016 at 7:05 PM, Flavio Pompermaier
>>>> > <[hidden email]>
>>>> > wrote:
>>>> >> what do you mean exactly?
>>>> >>
>>>> >> On 1 Jul 2016 18:58, "Aljoscha Krettek" <[hidden email]> wrote:
>>>> >>>
>>>> >>> Hi,
>>>> >>> do you have any data in the coGroup/groupBy operators that you use,
>>>> >>> besides the input data?
>>>> >>>
>>>> >>> Cheers,
>>>> >>> Aljoscha
>>>> >>>
>>>> >>> On Fri, 1 Jul 2016 at 14:17 Flavio Pompermaier
>>>> >>> <[hidden email]>
>>>> >>> wrote:
>>>> >>>>
>>>> >>>> Hi to all,
>>>> >>>> I have a Flink job that computes data correctly when launched
>>>> >>>> locally
>>>> >>>> from my IDE while it doesn't when launched on the cluster.
>>>> >>>>
>>>> >>>> Is there any suggestion/example to understand the problematic
>>>> >>>> operators
>>>> >>>> in this way?
>>>> >>>> I think the root cause is the fact that some operator (e.g.
>>>> >>>> coGroup/groupBy,etc), which I assume to have all the data for a
>>>> >>>> key,
>>>> >>>> maybe
>>>> >>>> it is not (because the data is partitioned among nodes).
>>>> >>>>
>>>> >>>> Any help is appreciated,
>>>> >>>> Flavio
>
>
>
>
> --
>
> Flavio Pompermaier
> Development Department
> _______________________________________________
> OKKAMSrl - www.okkam.it
>
> Phone: +(39) 0461 283 702
> Fax: + (39) 0461 186 6433
> Email: [hidden email]
> Headquarters: Trento (Italy), via G.B. Trener 8
> Registered office: Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you are
> not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.