OOM issues with Python Objects
Posted by
Kevin Lam on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/OOM-issues-with-Python-Objects-tp42396.html
Hi all,
I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue.
For the OOM scenario, no elements get processed, the application runs without printing output and then eventually crashes with java.lang.OutOfMemoryError: Java heap space
Any insights into why this might be happening? Appreciate any help/suggestions.
I've included some code that illustrates the two situations below [0].
Thanks in advance!
[0]:
Code Snippet A: OOM scenario
class InputWrapper:
"""Helper class, used to make streams of the same type"""
def __init__(self, key: str, contents: Row = None):
self.key = key
self.contents = contents
x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d))
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o))
union = x_ds.union(y_ds)
union.print()
Code Snippet B: Working scenario:
class InputWrapper(Row):
"""Helper class, used to make streams of the same type"""
def __init__(self, key: str, contents: Row = None):
super().__init__(key, contents)
x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d),
output_type=InputWrapperTypeInfo())
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o),
output_type=InputWrapperTypeInfo())
union = x_ds.union(y_ds)
union.print()