[Statefun] Truncated Messages in Python workers

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[Statefun] Truncated Messages in Python workers

Jan Brusch
Hi,

recently we started seeing the following faulty behaviour in the Flink
Stateful Functions HTTP communication towards external Python workers.
This is only occuring when the system is under heavy load.

The Java Application will send HTTP Messages to an external Python
Function but the external Function fails to parse the message with a
"Truncated Message Error". Printouts show that the truncated message
looks as follows:

------------------------------

<Start of Message>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protob

------------------------------

Which leads to the following Error in the Python worker:

------------------------------

Error Parsing Message: Truncated Message

------------------------------

Either the sender or the receiver (or something in between) seems to be
truncacting some (not all) messages at some random point in the payload.
The source code in both Flink SDKs looks to be correct. We temporarily
solved this by setting the "maxNumBatchRequests" parameter in the
external function definition really low. But this is not an ideal
solution as we believe this adds considerable communication overhead
between the Java and the Python Functions.

The Stateful Function version is 2.2.2, java8. The Java App as well as
the external Python workers are deployed in the same kubernetes cluster.


Has anyone ever seen this problem before?

Best regards

Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply | Threaded
Open this post in threaded view
|

Re: [Statefun] Truncated Messages in Python workers

Stephan Ewen
Thanks for reporting this, it looks indeed like a potential bug.

I filed this Jira for it: https://issues.apache.org/jira/browse/FLINK-22729

Could you share (here ot in Jira) what the stack on the Python Worker side is (for example which HTTP server)? Do you know if the message truncation happens reliably at a certain message size?


On Wed, May 19, 2021 at 2:12 PM Jan Brusch <[hidden email]> wrote:
Hi,

recently we started seeing the following faulty behaviour in the Flink
Stateful Functions HTTP communication towards external Python workers.
This is only occuring when the system is under heavy load.

The Java Application will send HTTP Messages to an external Python
Function but the external Function fails to parse the message with a
"Truncated Message Error". Printouts show that the truncated message
looks as follows:

------------------------------

<Start of Message>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protob

------------------------------

Which leads to the following Error in the Python worker:

------------------------------

Error Parsing Message: Truncated Message

------------------------------

Either the sender or the receiver (or something in between) seems to be
truncacting some (not all) messages at some random point in the payload.
The source code in both Flink SDKs looks to be correct. We temporarily
solved this by setting the "maxNumBatchRequests" parameter in the
external function definition really low. But this is not an ideal
solution as we believe this adds considerable communication overhead
between the Java and the Python Functions.

The Stateful Function version is 2.2.2, java8. The Java App as well as
the external Python workers are deployed in the same kubernetes cluster.


Has anyone ever seen this problem before?

Best regards

Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply | Threaded
Open this post in threaded view
|

Re: [Statefun] Truncated Messages in Python workers

Igal Shilman
Hi Jan,

I haven't stumbled upon this but I will try to reconstruct that scenario with a stress test and report back.

Can you share a little bit about your environment. For example do you use gunicorn, ngnix, aiohttp/or flask perhaps?

I'd suggest maybe checking for request size limits parameters on that stack (ngnix / gunicorn, python app server)
On the worker side, there should be a detailed log message that also prints the request size, perhaps it would be useful to correlate that number
with any limits.

Thanks,
Igal.

לCa


On Thu, May 20, 2021 at 4:49 PM Stephan Ewen <[hidden email]> wrote:
Thanks for reporting this, it looks indeed like a potential bug.

I filed this Jira for it: https://issues.apache.org/jira/browse/FLINK-22729

Could you share (here ot in Jira) what the stack on the Python Worker side is (for example which HTTP server)? Do you know if the message truncation happens reliably at a certain message size?


On Wed, May 19, 2021 at 2:12 PM Jan Brusch <[hidden email]> wrote:
Hi,

recently we started seeing the following faulty behaviour in the Flink
Stateful Functions HTTP communication towards external Python workers.
This is only occuring when the system is under heavy load.

The Java Application will send HTTP Messages to an external Python
Function but the external Function fails to parse the message with a
"Truncated Message Error". Printouts show that the truncated message
looks as follows:

------------------------------

<Start of Message>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protob

------------------------------

Which leads to the following Error in the Python worker:

------------------------------

Error Parsing Message: Truncated Message

------------------------------

Either the sender or the receiver (or something in between) seems to be
truncacting some (not all) messages at some random point in the payload.
The source code in both Flink SDKs looks to be correct. We temporarily
solved this by setting the "maxNumBatchRequests" parameter in the
external function definition really low. But this is not an ideal
solution as we believe this adds considerable communication overhead
between the Java and the Python Functions.

The Stateful Function version is 2.2.2, java8. The Java App as well as
the external Python workers are deployed in the same kubernetes cluster.


Has anyone ever seen this problem before?

Best regards

Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply | Threaded
Open this post in threaded view
|

Re: [Statefun] Truncated Messages in Python workers

Igal Shilman
Hi again,

Something to mention in addition, it also could be the case that StateFun reaches a write timeout trying to write the accumulated batch to the remote function (when the remote functions are overloaded.)
The requests are retried automatically, but still you way want to bump these timeouts [1]




On Fri, May 21, 2021 at 10:53 AM Igal Shilman <[hidden email]> wrote:
Hi Jan,

I haven't stumbled upon this but I will try to reconstruct that scenario with a stress test and report back.

Can you share a little bit about your environment. For example do you use gunicorn, ngnix, aiohttp/or flask perhaps?

I'd suggest maybe checking for request size limits parameters on that stack (ngnix / gunicorn, python app server)
On the worker side, there should be a detailed log message that also prints the request size, perhaps it would be useful to correlate that number
with any limits.

Thanks,
Igal.

לCa


On Thu, May 20, 2021 at 4:49 PM Stephan Ewen <[hidden email]> wrote:
Thanks for reporting this, it looks indeed like a potential bug.

I filed this Jira for it: https://issues.apache.org/jira/browse/FLINK-22729

Could you share (here ot in Jira) what the stack on the Python Worker side is (for example which HTTP server)? Do you know if the message truncation happens reliably at a certain message size?


On Wed, May 19, 2021 at 2:12 PM Jan Brusch <[hidden email]> wrote:
Hi,

recently we started seeing the following faulty behaviour in the Flink
Stateful Functions HTTP communication towards external Python workers.
This is only occuring when the system is under heavy load.

The Java Application will send HTTP Messages to an external Python
Function but the external Function fails to parse the message with a
"Truncated Message Error". Printouts show that the truncated message
looks as follows:

------------------------------

<Start of Message>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protobuf Content>

my.protobuf.MyClass: <Protob

------------------------------

Which leads to the following Error in the Python worker:

------------------------------

Error Parsing Message: Truncated Message

------------------------------

Either the sender or the receiver (or something in between) seems to be
truncacting some (not all) messages at some random point in the payload.
The source code in both Flink SDKs looks to be correct. We temporarily
solved this by setting the "maxNumBatchRequests" parameter in the
external function definition really low. But this is not an ideal
solution as we believe this adds considerable communication overhead
between the Java and the Python Functions.

The Stateful Function version is 2.2.2, java8. The Java App as well as
the external Python workers are deployed in the same kubernetes cluster.


Has anyone ever seen this problem before?

Best regards

Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501