OOM issues with Python Objects

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

OOM issues with Python Objects

Kevin Lam
Hi all,

I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue.  

For the OOM scenario, no elements get processed, the application runs without printing output and then eventually crashes with java.lang.OutOfMemoryError: Java heap space

Any insights into why this might be happening? Appreciate any help/suggestions.
I've included some code that illustrates the two situations below [0].

Thanks in advance! 

[0]: 

Code Snippet A: OOM scenario

class InputWrapper:
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
self.key = key
self.contents = contents

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d))
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o))
union = x_ds.union(y_ds)
union.print()

Code Snippet B: Working scenario:

class InputWrapper(Row):
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
super().__init__(key, contents)

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d),
output_type=InputWrapperTypeInfo())
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o),
output_type=InputWrapperTypeInfo())
union = x_ds.union(y_ds)
union.print()


Reply | Threaded
Open this post in threaded view
|

Re: OOM issues with Python Objects

Arvid Heise-4
Hi Kevin,

I suspect that this is because Row is not supported as a Python field [1]; it's supposed to be a dict that is mapped to a Row by Flink.
Maybe it runs in some infinite loop while trying serialize and hence the OOM.

Subclassing Row might be an undocumented feature.

I'm also pulling in Dian who knows more about PyFlink.


On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <[hidden email]> wrote:
Hi all,

I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue.  

For the OOM scenario, no elements get processed, the application runs without printing output and then eventually crashes with java.lang.OutOfMemoryError: Java heap space

Any insights into why this might be happening? Appreciate any help/suggestions.
I've included some code that illustrates the two situations below [0].

Thanks in advance! 

[0]: 

Code Snippet A: OOM scenario

class InputWrapper:
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
self.key = key
self.contents = contents

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d))
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o))
union = x_ds.union(y_ds)
union.print()

Code Snippet B: Working scenario:

class InputWrapper(Row):
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
super().__init__(key, contents)

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d),
output_type=InputWrapperTypeInfo())
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o),
output_type=InputWrapperTypeInfo())
union = x_ds.union(y_ds)
union.print()


Reply | Threaded
Open this post in threaded view
|

Re: OOM issues with Python Objects

Kevin Lam
Thanks for the response Arvid! Point of clarification, things do NOT OOM when I use the Row subclass. Instead, the code that doesn't use the Row subclass is the code that OOMs (ie. the simple python class). 



On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <[hidden email]> wrote:
Hi Kevin,

I suspect that this is because Row is not supported as a Python field [1]; it's supposed to be a dict that is mapped to a Row by Flink.
Maybe it runs in some infinite loop while trying serialize and hence the OOM.

Subclassing Row might be an undocumented feature.

I'm also pulling in Dian who knows more about PyFlink.


On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <[hidden email]> wrote:
Hi all,

I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue.  

For the OOM scenario, no elements get processed, the application runs without printing output and then eventually crashes with java.lang.OutOfMemoryError: Java heap space

Any insights into why this might be happening? Appreciate any help/suggestions.
I've included some code that illustrates the two situations below [0].

Thanks in advance! 

[0]: 

Code Snippet A: OOM scenario

class InputWrapper:
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
self.key = key
self.contents = contents

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d))
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o))
union = x_ds.union(y_ds)
union.print()

Code Snippet B: Working scenario:

class InputWrapper(Row):
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
super().__init__(key, contents)

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d),
output_type=InputWrapperTypeInfo())
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o),
output_type=InputWrapperTypeInfo())
union = x_ds.union(y_ds)
union.print()


Reply | Threaded
Open this post in threaded view
|

Re: OOM issues with Python Objects

Arvid Heise-4
Hi Kevin,

yes I understood that, but then your Python class contains a Row field, where no mapping exists. I'm assuming PyFlink tries to do a deep conversion and fails to do so by ending in some infinite loop.

On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam <[hidden email]> wrote:
Thanks for the response Arvid! Point of clarification, things do NOT OOM when I use the Row subclass. Instead, the code that doesn't use the Row subclass is the code that OOMs (ie. the simple python class). 



On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <[hidden email]> wrote:
Hi Kevin,

I suspect that this is because Row is not supported as a Python field [1]; it's supposed to be a dict that is mapped to a Row by Flink.
Maybe it runs in some infinite loop while trying serialize and hence the OOM.

Subclassing Row might be an undocumented feature.

I'm also pulling in Dian who knows more about PyFlink.


On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <[hidden email]> wrote:
Hi all,

I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue.  

For the OOM scenario, no elements get processed, the application runs without printing output and then eventually crashes with java.lang.OutOfMemoryError: Java heap space

Any insights into why this might be happening? Appreciate any help/suggestions.
I've included some code that illustrates the two situations below [0].

Thanks in advance! 

[0]: 

Code Snippet A: OOM scenario

class InputWrapper:
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
self.key = key
self.contents = contents

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d))
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o))
union = x_ds.union(y_ds)
union.print()

Code Snippet B: Working scenario:

class InputWrapper(Row):
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
super().__init__(key, contents)

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d),
output_type=InputWrapperTypeInfo())
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o),
output_type=InputWrapperTypeInfo())
union = x_ds.union(y_ds)
union.print()


Reply | Threaded
Open this post in threaded view
|

Re: OOM issues with Python Objects

Dian Fu
Hi Kevin,

Is it possible to provide a simple example to reproduce this issue? 

PS: It will use pickle to perform the serialization/deserialization if you don't specify the type info.

Regards,
Dian


On Mon, Mar 22, 2021 at 10:55 PM Arvid Heise <[hidden email]> wrote:
Hi Kevin,

yes I understood that, but then your Python class contains a Row field, where no mapping exists. I'm assuming PyFlink tries to do a deep conversion and fails to do so by ending in some infinite loop.

On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam <[hidden email]> wrote:
Thanks for the response Arvid! Point of clarification, things do NOT OOM when I use the Row subclass. Instead, the code that doesn't use the Row subclass is the code that OOMs (ie. the simple python class). 



On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <[hidden email]> wrote:
Hi Kevin,

I suspect that this is because Row is not supported as a Python field [1]; it's supposed to be a dict that is mapped to a Row by Flink.
Maybe it runs in some infinite loop while trying serialize and hence the OOM.

Subclassing Row might be an undocumented feature.

I'm also pulling in Dian who knows more about PyFlink.


On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <[hidden email]> wrote:
Hi all,

I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue.  

For the OOM scenario, no elements get processed, the application runs without printing output and then eventually crashes with java.lang.OutOfMemoryError: Java heap space

Any insights into why this might be happening? Appreciate any help/suggestions.
I've included some code that illustrates the two situations below [0].

Thanks in advance! 

[0]: 

Code Snippet A: OOM scenario

class InputWrapper:
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
self.key = key
self.contents = contents

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d))
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o))
union = x_ds.union(y_ds)
union.print()

Code Snippet B: Working scenario:

class InputWrapper(Row):
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
super().__init__(key, contents)

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d),
output_type=InputWrapperTypeInfo())
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o),
output_type=InputWrapperTypeInfo())
union = x_ds.union(y_ds)
union.print()


Reply | Threaded
Open this post in threaded view
|

Re: OOM issues with Python Objects

Kevin Lam
Hi Dian,

I have unit tests for which both sets of code (Row subclass vs. custom Python class) passes. The OOM occurs when reading a large amount of data from a kafka topic. 

At the moment I don't have a simple example to reproduce the issue, I'll let you know. 

On Tue, Mar 23, 2021 at 2:17 AM Dian Fu <[hidden email]> wrote:
Hi Kevin,

Is it possible to provide a simple example to reproduce this issue? 

PS: It will use pickle to perform the serialization/deserialization if you don't specify the type info.

Regards,
Dian


On Mon, Mar 22, 2021 at 10:55 PM Arvid Heise <[hidden email]> wrote:
Hi Kevin,

yes I understood that, but then your Python class contains a Row field, where no mapping exists. I'm assuming PyFlink tries to do a deep conversion and fails to do so by ending in some infinite loop.

On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam <[hidden email]> wrote:
Thanks for the response Arvid! Point of clarification, things do NOT OOM when I use the Row subclass. Instead, the code that doesn't use the Row subclass is the code that OOMs (ie. the simple python class). 



On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <[hidden email]> wrote:
Hi Kevin,

I suspect that this is because Row is not supported as a Python field [1]; it's supposed to be a dict that is mapped to a Row by Flink.
Maybe it runs in some infinite loop while trying serialize and hence the OOM.

Subclassing Row might be an undocumented feature.

I'm also pulling in Dian who knows more about PyFlink.


On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <[hidden email]> wrote:
Hi all,

I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue.  

For the OOM scenario, no elements get processed, the application runs without printing output and then eventually crashes with java.lang.OutOfMemoryError: Java heap space

Any insights into why this might be happening? Appreciate any help/suggestions.
I've included some code that illustrates the two situations below [0].

Thanks in advance! 

[0]: 

Code Snippet A: OOM scenario

class InputWrapper:
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
self.key = key
self.contents = contents

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d))
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o))
union = x_ds.union(y_ds)
union.print()

Code Snippet B: Working scenario:

class InputWrapper(Row):
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
super().__init__(key, contents)

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d),
output_type=InputWrapperTypeInfo())
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o),
output_type=InputWrapperTypeInfo())
union = x_ds.union(y_ds)
union.print()