Hi Kevin,I suspect that this is because Row is not supported as a Python field [1]; it's supposed to be a dict that is mapped to a Row by Flink.Maybe it runs in some infinite loop while trying serialize and hence the OOM.Subclassing Row might be an undocumented feature.I'm also pulling in Dian who knows more about PyFlink.On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <[hidden email]> wrote:Hi all,I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue.For the OOM scenario, no elements get processed, the application runs without printing output and then eventually crashes with java.lang.OutOfMemoryError: Java heap spaceAny insights into why this might be happening? Appreciate any help/suggestions.I've included some code that illustrates the two situations below [0].Thanks in advance![0]:Code Snippet A: OOM scenarioclass InputWrapper:"""Helper class, used to make streams of the same type"""def __init__(self, key: str, contents: Row = None):self.key = keyself.contents = contentsx_ds = x_ds.map(lambda d: InputWrapper(key=d['key'], contents=d))y_ds = y_ds.map(lambda o: InputWrapper(key=o['key'], contents=o))union = x_ds.union(y_ds)union.print()Code Snippet B: Working scenario:class InputWrapper(Row):"""Helper class, used to make streams of the same type"""def __init__(self, key: str, contents: Row = None):super().__init__(key, contents)x_ds = x_ds.map(lambda d: InputWrapper(key=d['key'], contents=d),output_type=InputWrapperTypeInfo())y_ds = y_ds.map(lambda o: InputWrapper(key=o['key'], contents=o),output_type=InputWrapperTypeInfo())union = x_ds.union(y_ds)union.print()
Free forum by Nabble | Edit this page |