Hi all,I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue.For the OOM scenario, no elements get processed, the application runs without printing output and then eventually crashes with java.lang.OutOfMemoryError: Java heap spaceAny insights into why this might be happening? Appreciate any help/suggestions.I've included some code that illustrates the two situations below [0].Thanks in advance![0]:Code Snippet A: OOM scenarioclass InputWrapper:"""Helper class, used to make streams of the same type"""def __init__(self, key: str, contents: Row = None):self.key = keyself.contents = contentsx_ds = x_ds.map(lambda d: InputWrapper(key=d['key'], contents=d))y_ds = y_ds.map(lambda o: InputWrapper(key=o['key'], contents=o))union = x_ds.union(y_ds)union.print()Code Snippet B: Working scenario:class InputWrapper(Row):"""Helper class, used to make streams of the same type"""def __init__(self, key: str, contents: Row = None):super().__init__(key, contents)x_ds = x_ds.map(lambda d: InputWrapper(key=d['key'], contents=d),output_type=InputWrapperTypeInfo())y_ds = y_ds.map(lambda o: InputWrapper(key=o['key'], contents=o),output_type=InputWrapperTypeInfo())union = x_ds.union(y_ds)union.print()
Free forum by Nabble | Edit this page |