Declaring and using ROW data type in PyFlink DataStream

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Declaring and using ROW data type in PyFlink DataStream

meneldor
Hello,

What is the correct way to use Python dict's as ROW type in pyflink? Im trying this:
output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
[Types.STRING(), Types.STRING(), Types.LONG() ])
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
result = {"id": ctx.get_current_key()[0], "data": "some_string", "timestamp": 111111111111}
out.collect(result)
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
logging.info(timestamp)
out.collect("On timer timestamp: " + str(timestamp))
ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info)

I just hardcoded the values in MyProcessFunction to be sure that the input data doesnt mess the fields. So the data is correct but PyFlink trews an exception:
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
... 10 more
However it works with primitive types like Types.STRING(). According to the documentation the ROW type corresponds to the python's dict type.

Regards
Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

Shuiqiang Chen
Hi meneldor,

The main cause of the error is that there is a bug in `ctx.timer_service().current_watermark()`. At the beginning the stream, when the first record come into the KeyedProcessFunction.process_element() , the current_watermark will be the Long.MIN_VALUE at Java side, while at the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.  
>>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be automatically converted to a long interger in python but will cause Long value overflow in Java when deserializing the registered timer value. I will craete a issue to fix the bug.

Let’s return to your initial question, at PyFlink you could create a Row Type data as bellow: 

>>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111)

And I wonder which release version of flink the code snippet you provided based on? The latest API for KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer() will not provid a `collector` to collect output data but use `yield` which is a more pythonic approach. 

Please refer to the following code:
def keyed_process_function_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, 'hello', '1603708211000'),
(2, 'hi', '1603708224000'),
(3, 'hello', '1603708226000'),
(4, 'hi', '1603708289000')],
type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[2])

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield Row(id=ctx.get_current_key()[1], data='some_string', timestamp=11111111)
# current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield Row(id=ctx.get_current_key()[1], data='current on timer timestamp: ' + str(timestamp),
timestamp=timestamp)

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], [Types.STRING(), Types.STRING(), Types.INT()])
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: (x[0], x[1]), key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info).print()
env.execute('test keyed process function')

Best,
Shuiqiang





meneldor <[hidden email]> 于2021年1月14日周四 下午10:45写道:
Hello,

What is the correct way to use Python dict's as ROW type in pyflink? Im trying this:
output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
[Types.STRING(), Types.STRING(), Types.LONG() ])
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
result = {"id": ctx.get_current_key()[0], "data": "some_string", "timestamp": 111111111111}
out.collect(result)
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
logging.info(timestamp)
out.collect("On timer timestamp: " + str(timestamp))
ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info)

I just hardcoded the values in MyProcessFunction to be sure that the input data doesnt mess the fields. So the data is correct but PyFlink trews an exception:
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
... 10 more
However it works with primitive types like Types.STRING(). According to the documentation the ROW type corresponds to the python's dict type.

Regards
Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

meneldor
Thank you for the answer Shuiqiang!
Im using the last apache-flink version:
Requirement already up-to-date: apache-flink in ./venv/lib/python3.7/site-packages (1.12.0)
however the method signature is using a collector:

image.png
 Im using the setup-pyflink-virtual-env.sh shell script from the docs(which uses pip).

Regards 

On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor,

The main cause of the error is that there is a bug in `ctx.timer_service().current_watermark()`. At the beginning the stream, when the first record come into the KeyedProcessFunction.process_element() , the current_watermark will be the Long.MIN_VALUE at Java side, while at the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.  
>>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be automatically converted to a long interger in python but will cause Long value overflow in Java when deserializing the registered timer value. I will craete a issue to fix the bug.

Let’s return to your initial question, at PyFlink you could create a Row Type data as bellow: 

>>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111)

And I wonder which release version of flink the code snippet you provided based on? The latest API for KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer() will not provid a `collector` to collect output data but use `yield` which is a more pythonic approach. 

Please refer to the following code:
def keyed_process_function_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, 'hello', '1603708211000'),
(2, 'hi', '1603708224000'),
(3, 'hello', '1603708226000'),
(4, 'hi', '1603708289000')],
type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[2])

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield Row(id=ctx.get_current_key()[1], data='some_string', timestamp=11111111)
# current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield Row(id=ctx.get_current_key()[1], data='current on timer timestamp: ' + str(timestamp),
timestamp=timestamp)

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], [Types.STRING(), Types.STRING(), Types.INT()])
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: (x[0], x[1]), key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info).print()
env.execute('test keyed process function')

Best,
Shuiqiang





meneldor <[hidden email]> 于2021年1月14日周四 下午10:45写道:
Hello,

What is the correct way to use Python dict's as ROW type in pyflink? Im trying this:
output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
[Types.STRING(), Types.STRING(), Types.LONG() ])
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
result = {"id": ctx.get_current_key()[0], "data": "some_string", "timestamp": 111111111111}
out.collect(result)
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
logging.info(timestamp)
out.collect("On timer timestamp: " + str(timestamp))
ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info)

I just hardcoded the values in MyProcessFunction to be sure that the input data doesnt mess the fields. So the data is correct but PyFlink trews an exception:
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
... 10 more
However it works with primitive types like Types.STRING(). According to the documentation the ROW type corresponds to the python's dict type.

Regards
Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

Xingbo Huang
Hi meneldor,

I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example. The signature of the `process_element` method has been changed in the new version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out your results.

[1] https://issues.apache.org/jira/browse/FLINK-20647

Best,
Xingbo

meneldor <[hidden email]> 于2021年1月15日周五 上午1:20写道:
Thank you for the answer Shuiqiang!
Im using the last apache-flink version:
Requirement already up-to-date: apache-flink in ./venv/lib/python3.7/site-packages (1.12.0)
however the method signature is using a collector:

image.png
 Im using the setup-pyflink-virtual-env.sh shell script from the docs(which uses pip).

Regards 

On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor,

The main cause of the error is that there is a bug in `ctx.timer_service().current_watermark()`. At the beginning the stream, when the first record come into the KeyedProcessFunction.process_element() , the current_watermark will be the Long.MIN_VALUE at Java side, while at the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.  
>>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be automatically converted to a long interger in python but will cause Long value overflow in Java when deserializing the registered timer value. I will craete a issue to fix the bug.

Let’s return to your initial question, at PyFlink you could create a Row Type data as bellow: 

>>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111)

And I wonder which release version of flink the code snippet you provided based on? The latest API for KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer() will not provid a `collector` to collect output data but use `yield` which is a more pythonic approach. 

Please refer to the following code:
def keyed_process_function_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, 'hello', '1603708211000'),
(2, 'hi', '1603708224000'),
(3, 'hello', '1603708226000'),
(4, 'hi', '1603708289000')],
type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[2])

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield Row(id=ctx.get_current_key()[1], data='some_string', timestamp=11111111)
# current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield Row(id=ctx.get_current_key()[1], data='current on timer timestamp: ' + str(timestamp),
timestamp=timestamp)

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], [Types.STRING(), Types.STRING(), Types.INT()])
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: (x[0], x[1]), key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info).print()
env.execute('test keyed process function')

Best,
Shuiqiang





meneldor <[hidden email]> 于2021年1月14日周四 下午10:45写道:
Hello,

What is the correct way to use Python dict's as ROW type in pyflink? Im trying this:
output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
[Types.STRING(), Types.STRING(), Types.LONG() ])
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
result = {"id": ctx.get_current_key()[0], "data": "some_string", "timestamp": 111111111111}
out.collect(result)
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
logging.info(timestamp)
out.collect("On timer timestamp: " + str(timestamp))
ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info)

I just hardcoded the values in MyProcessFunction to be sure that the input data doesnt mess the fields. So the data is correct but PyFlink trews an exception:
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
... 10 more
However it works with primitive types like Types.STRING(). According to the documentation the ROW type corresponds to the python's dict type.

Regards
Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

meneldor
I imported pyflink.common.types.Row and used it as Shuiqiang suggested but now Java throws a memory exception:

Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space}
... 11 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:91)
at org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:127)
at org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90)
at org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator$$Lambda$670/579781231.onProcessingTime(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)
at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$844/2129217743.run(Unknown Source)

Regards 

On Fri, Jan 15, 2021 at 4:00 AM Xingbo Huang <[hidden email]> wrote:
Hi meneldor,

I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example. The signature of the `process_element` method has been changed in the new version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out your results.

[1] https://issues.apache.org/jira/browse/FLINK-20647

Best,
Xingbo

meneldor <[hidden email]> 于2021年1月15日周五 上午1:20写道:
Thank you for the answer Shuiqiang!
Im using the last apache-flink version:
Requirement already up-to-date: apache-flink in ./venv/lib/python3.7/site-packages (1.12.0)
however the method signature is using a collector:

image.png
 Im using the setup-pyflink-virtual-env.sh shell script from the docs(which uses pip).

Regards 

On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor,

The main cause of the error is that there is a bug in `ctx.timer_service().current_watermark()`. At the beginning the stream, when the first record come into the KeyedProcessFunction.process_element() , the current_watermark will be the Long.MIN_VALUE at Java side, while at the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.  
>>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be automatically converted to a long interger in python but will cause Long value overflow in Java when deserializing the registered timer value. I will craete a issue to fix the bug.

Let’s return to your initial question, at PyFlink you could create a Row Type data as bellow: 

>>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111)

And I wonder which release version of flink the code snippet you provided based on? The latest API for KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer() will not provid a `collector` to collect output data but use `yield` which is a more pythonic approach. 

Please refer to the following code:
def keyed_process_function_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, 'hello', '1603708211000'),
(2, 'hi', '1603708224000'),
(3, 'hello', '1603708226000'),
(4, 'hi', '1603708289000')],
type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[2])

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield Row(id=ctx.get_current_key()[1], data='some_string', timestamp=11111111)
# current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield Row(id=ctx.get_current_key()[1], data='current on timer timestamp: ' + str(timestamp),
timestamp=timestamp)

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], [Types.STRING(), Types.STRING(), Types.INT()])
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: (x[0], x[1]), key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info).print()
env.execute('test keyed process function')

Best,
Shuiqiang





meneldor <[hidden email]> 于2021年1月14日周四 下午10:45写道:
Hello,

What is the correct way to use Python dict's as ROW type in pyflink? Im trying this:
output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
[Types.STRING(), Types.STRING(), Types.LONG() ])
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
result = {"id": ctx.get_current_key()[0], "data": "some_string", "timestamp": 111111111111}
out.collect(result)
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
logging.info(timestamp)
out.collect("On timer timestamp: " + str(timestamp))
ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info)

I just hardcoded the values in MyProcessFunction to be sure that the input data doesnt mess the fields. So the data is correct but PyFlink trews an exception:
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
... 10 more
However it works with primitive types like Types.STRING(). According to the documentation the ROW type corresponds to the python's dict type.

Regards
Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

Shuiqiang Chen
Hi meneldor, Xingbo,

Sorry for the late reply. 

Thanks a lot for Xingbo’s clarification.

And according to the stacktrace of the exception, could you have a check whether the result data match the specified return type? BTW, please share your code if it’s ok, it will be of help to debug.

Best,
Shuiqiang




meneldor <[hidden email]> 于2021年1月15日周五 下午4:59写道:
I imported pyflink.common.types.Row and used it as Shuiqiang suggested but now Java throws a memory exception:

Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space}
... 11 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:91)
at org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:127)
at org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90)
at org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator$$Lambda$670/579781231.onProcessingTime(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)
at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$844/2129217743.run(Unknown Source)

Regards 

On Fri, Jan 15, 2021 at 4:00 AM Xingbo Huang <[hidden email]> wrote:
Hi meneldor,

I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example. The signature of the `process_element` method has been changed in the new version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out your results.

[1] https://issues.apache.org/jira/browse/FLINK-20647

Best,
Xingbo

meneldor <[hidden email]> 于2021年1月15日周五 上午1:20写道:
Thank you for the answer Shuiqiang!
Im using the last apache-flink version:
Requirement already up-to-date: apache-flink in ./venv/lib/python3.7/site-packages (1.12.0)
however the method signature is using a collector:

image.png
 Im using the setup-pyflink-virtual-env.sh shell script from the docs(which uses pip).

Regards 

On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor,

The main cause of the error is that there is a bug in `ctx.timer_service().current_watermark()`. At the beginning the stream, when the first record come into the KeyedProcessFunction.process_element() , the current_watermark will be the Long.MIN_VALUE at Java side, while at the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.  
>>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be automatically converted to a long interger in python but will cause Long value overflow in Java when deserializing the registered timer value. I will craete a issue to fix the bug.

Let’s return to your initial question, at PyFlink you could create a Row Type data as bellow: 

>>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111)

And I wonder which release version of flink the code snippet you provided based on? The latest API for KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer() will not provid a `collector` to collect output data but use `yield` which is a more pythonic approach. 

Please refer to the following code:
def keyed_process_function_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, 'hello', '1603708211000'),
(2, 'hi', '1603708224000'),
(3, 'hello', '1603708226000'),
(4, 'hi', '1603708289000')],
type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[2])

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield Row(id=ctx.get_current_key()[1], data='some_string', timestamp=11111111)
# current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield Row(id=ctx.get_current_key()[1], data='current on timer timestamp: ' + str(timestamp),
timestamp=timestamp)

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], [Types.STRING(), Types.STRING(), Types.INT()])
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: (x[0], x[1]), key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info).print()
env.execute('test keyed process function')

Best,
Shuiqiang





meneldor <[hidden email]> 于2021年1月14日周四 下午10:45写道:
Hello,

What is the correct way to use Python dict's as ROW type in pyflink? Im trying this:
output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
[Types.STRING(), Types.STRING(), Types.LONG() ])
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
result = {"id": ctx.get_current_key()[0], "data": "some_string", "timestamp": 111111111111}
out.collect(result)
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
logging.info(timestamp)
out.collect("On timer timestamp: " + str(timestamp))
ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info)

I just hardcoded the values in MyProcessFunction to be sure that the input data doesnt mess the fields. So the data is correct but PyFlink trews an exception:
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
... 10 more
However it works with primitive types like Types.STRING(). According to the documentation the ROW type corresponds to the python's dict type.

Regards
Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

meneldor
Hi, here is the code. This is a JSON data from Maxwell CDC:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.get_config().set_auto_watermark_interval(2000)
env.set_parallelism(1)
device_type_info = Types.ROW_NAMED(['commit',
'ts',
'type',
'data',
'old'],
[Types.BOOLEAN(),
Types.LONG(),
Types.STRING(),
Types.ROW_NAMED(['id', 'tp', 'device_ts', 'account'], [Types.STRING(),Types.STRING(),Types.LONG(),Types.STRING()]),
Types.ROW_NAMED(['id', 'tp', 'device_ts', 'account'], [Types.STRING(),Types.STRING(),Types.LONG(),Types.STRING()])])
output_type_info = Types.ROW_NAMED(['id',
'tp',
'account',
'device_ts'
],
[Types.STRING(),
Types.STRING(),
Types.INT(),
Types.STRING(),
Types.LONG()
])

device_row_schema = JsonRowDeserializationSchema.builder().type_info(device_type_info).build()

class KafkaRowTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value: Any, record_timestamp: int):
return int(value[3][2])


class MyKeySelector(KeySelector):
def get_key(self, value):
return (str(value[3][0]), str(value[3][1]))


class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2])
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, timestamp=timestamp)

device_consumer = FlinkKafkaConsumer("device", device_row_schema, {'bootstrap.servers': 'localhost:9092'})
device_consumer.set_start_from_earliest()
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(KafkaRowTimestampAssigner())

device_ds = env.add_source(device_consumer)
device_ds.assign_timestamps_and_watermarks(watermark_strategy).key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info)

job_client = env.execute_async('Device enrichment Job')
job_client.get_job_execution_result().result()

This is the input data: {"commit": true, "ts": 1610546861, "type": "update", "data": {"id": "id2", "tp": "B", "device_ts": 1610546861, "account": "279"}, "old": {}}
1) If I change the output type to STRING() and return a str from process_element everything is OK but I need to use JsonRowSerializationSchema later on that data.
2) I'm not sure what to return in on_timer as it's missing the value argument which process_element has.

Regards

On Mon, Jan 18, 2021 at 4:47 AM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor, Xingbo,

Sorry for the late reply. 

Thanks a lot for Xingbo’s clarification.

And according to the stacktrace of the exception, could you have a check whether the result data match the specified return type? BTW, please share your code if it’s ok, it will be of help to debug.

Best,
Shuiqiang




meneldor <[hidden email]> 于2021年1月15日周五 下午4:59写道:
I imported pyflink.common.types.Row and used it as Shuiqiang suggested but now Java throws a memory exception:

Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space}
... 11 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:91)
at org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:127)
at org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90)
at org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator$$Lambda$670/579781231.onProcessingTime(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)
at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$844/2129217743.run(Unknown Source)

Regards 

On Fri, Jan 15, 2021 at 4:00 AM Xingbo Huang <[hidden email]> wrote:
Hi meneldor,

I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example. The signature of the `process_element` method has been changed in the new version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out your results.

[1] https://issues.apache.org/jira/browse/FLINK-20647

Best,
Xingbo

meneldor <[hidden email]> 于2021年1月15日周五 上午1:20写道:
Thank you for the answer Shuiqiang!
Im using the last apache-flink version:
Requirement already up-to-date: apache-flink in ./venv/lib/python3.7/site-packages (1.12.0)
however the method signature is using a collector:

image.png
 Im using the setup-pyflink-virtual-env.sh shell script from the docs(which uses pip).

Regards 

On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor,

The main cause of the error is that there is a bug in `ctx.timer_service().current_watermark()`. At the beginning the stream, when the first record come into the KeyedProcessFunction.process_element() , the current_watermark will be the Long.MIN_VALUE at Java side, while at the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.  
>>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be automatically converted to a long interger in python but will cause Long value overflow in Java when deserializing the registered timer value. I will craete a issue to fix the bug.

Let’s return to your initial question, at PyFlink you could create a Row Type data as bellow: 

>>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111)

And I wonder which release version of flink the code snippet you provided based on? The latest API for KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer() will not provid a `collector` to collect output data but use `yield` which is a more pythonic approach. 

Please refer to the following code:
def keyed_process_function_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, 'hello', '1603708211000'),
(2, 'hi', '1603708224000'),
(3, 'hello', '1603708226000'),
(4, 'hi', '1603708289000')],
type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[2])

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield Row(id=ctx.get_current_key()[1], data='some_string', timestamp=11111111)
# current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield Row(id=ctx.get_current_key()[1], data='current on timer timestamp: ' + str(timestamp),
timestamp=timestamp)

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], [Types.STRING(), Types.STRING(), Types.INT()])
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: (x[0], x[1]), key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info).print()
env.execute('test keyed process function')

Best,
Shuiqiang





meneldor <[hidden email]> 于2021年1月14日周四 下午10:45写道:
Hello,

What is the correct way to use Python dict's as ROW type in pyflink? Im trying this:
output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
[Types.STRING(), Types.STRING(), Types.LONG() ])
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
result = {"id": ctx.get_current_key()[0], "data": "some_string", "timestamp": 111111111111}
out.collect(result)
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
logging.info(timestamp)
out.collect("On timer timestamp: " + str(timestamp))
ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info)

I just hardcoded the values in MyProcessFunction to be sure that the input data doesnt mess the fields. So the data is correct but PyFlink trews an exception:
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
... 10 more
However it works with primitive types like Types.STRING(). According to the documentation the ROW type corresponds to the python's dict type.

Regards
Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

Shuiqiang Chen
Hi meneldor, 

Actually, the return type of the on_timer() must be the same as process_element(). It seems that the yield value of process_element() is missing the `timestamp` field.  And the `output_type_info` has four field names but with 5 field types. Could you align them?

Best,
Shuiqiang
Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

meneldor
Actually the output_type_info is ok, it was copy/paste typo. I changed the function to:
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], timestamp=ctx.timestamp())
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, timestamp=timestamp)
And the type to:
output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()])
I cant return the same data in on_timer() because there is no value parameter. Thats why i hardcoded device_ts. However the exception persists.
I am not sure if the timestamp field should be included in output_type_info as i did now.
Regards

On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor, 

Actually, the return type of the on_timer() must be the same as process_element(). It seems that the yield value of process_element() is missing the `timestamp` field.  And the `output_type_info` has four field names but with 5 field types. Could you align them?

Best,
Shuiqiang
Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

Xingbo Huang
Hi Shuiqiang, meneldor,

1. In fact, there is a problem with using Python `Named Row` as the return value of user-defined function in PyFlink.

When serializing a Row data, the serializer of each field is consistent with the order of the Row fields. But the field order of Python `Named Row` has been sorted by field, and it was designed to better compare Named Row and calculate hash values. So this can lead to serialization/deserialization errors(The correspondence between serializer and field is wrong). It is for performance considerations that serializers are not specified according to file name, but `Named Row` support can be achieved at the expense of a little performance for ease of use. For the current example, I suggest returning a list or a normal Row, instead of a Named Row.


2. In pyflink 1.12.0, the method signature of `on_timer` should be `def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector)`[1].  If you want to send data in `on_timer`, you can use `Collector.collect`. e.g.
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
out.collect(Row('a', 'b', 'c'))

3. >>> I am not sure if the timestamp field should be included in output_type_info as i did now.

If you return data with a time_stamp field, `output_type_info` needs to have `time_stamp` field. For example, the data returned in your example contains `time_stamp`, so your `output_type_info` needs to have the information of this field.



Best,
Xingbo

2021年1月18日 下午9:21,meneldor <[hidden email]> 写道:

Actually the output_type_info is ok, it was copy/paste typo. I changed the function to:
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], timestamp=ctx.timestamp())
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, timestamp=timestamp)
And the type to:
output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()])
I cant return the same data in on_timer() because there is no value parameter. Thats why i hardcoded device_ts. However the exception persists.
I am not sure if the timestamp field should be included in output_type_info as i did now.
Regards

On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor, 

Actually, the return type of the on_timer() must be the same as process_element(). It seems that the yield value of process_element() is missing the `timestamp` field.  And the `output_type_info` has four field names but with 5 field types. Could you align them?

Best,
Shuiqiang

Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

meneldor
Thank you Xingbo

1. I will try to use normal list instead of named. Thanks!
2. There is a new 1.12.1 version of pyflink which is using process_element(self, value, ctx: 'KeyedProcessFunction.Context')

And what about the on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext')? Can i access the value as inprocess_element() in the ctx for example?

Thank you!

On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Shuiqiang, meneldor,

1. In fact, there is a problem with using Python `Named Row` as the return value of user-defined function in PyFlink.

When serializing a Row data, the serializer of each field is consistent with the order of the Row fields. But the field order of Python `Named Row` has been sorted by field, and it was designed to better compare Named Row and calculate hash values. So this can lead to serialization/deserialization errors(The correspondence between serializer and field is wrong). It is for performance considerations that serializers are not specified according to file name, but `Named Row` support can be achieved at the expense of a little performance for ease of use. For the current example, I suggest returning a list or a normal Row, instead of a Named Row.


2. In pyflink 1.12.0, the method signature of `on_timer` should be `def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector)`[1].  If you want to send data in `on_timer`, you can use `Collector.collect`. e.g.
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
out.collect(Row('a', 'b', 'c'))

3. >>> I am not sure if the timestamp field should be included in output_type_info as i did now.

If you return data with a time_stamp field, `output_type_info` needs to have `time_stamp` field. For example, the data returned in your example contains `time_stamp`, so your `output_type_info` needs to have the information of this field.



Best,
Xingbo

2021年1月18日 下午9:21,meneldor <[hidden email]> 写道:

Actually the output_type_info is ok, it was copy/paste typo. I changed the function to:
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], timestamp=ctx.timestamp())
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, timestamp=timestamp)
And the type to:
output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()])
I cant return the same data in on_timer() because there is no value parameter. Thats why i hardcoded device_ts. However the exception persists.
I am not sure if the timestamp field should be included in output_type_info as i did now.
Regards

On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor, 

Actually, the return type of the on_timer() must be the same as process_element(). It seems that the yield value of process_element() is missing the `timestamp` field.  And the `output_type_info` has four field names but with 5 field types. Could you align them?

Best,
Shuiqiang

Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

Xingbo Huang
Hi meneldor,

1. Yes. Although release 1.12.1 has not been officially released, it is indeed available for download on PyPI.
In PyFlink 1.12.1, you only need to `yield` your output in `on_timer`.

2. Whenever an element comes, your `process_element` method will be invoked, so you can directly get the `value` parameter in `process_element`. The firing of the `on_timer` method depends on your registering timer, as you wrote in the example `ctx.timer_service().register_event_time_timer(current_watermark + 1500)`. You might need state access[1] which will be supported in release-1.13. At that time, you can get your state in `on_timer`, so as to conveniently control the output.


[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API.

Best,
Xingbo

meneldor <[hidden email]> 于2021年1月18日周一 下午10:44写道:
Thank you Xingbo

1. I will try to use normal list instead of named. Thanks!
2. There is a new 1.12.1 version of pyflink which is using process_element(self, value, ctx: 'KeyedProcessFunction.Context')

And what about the on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext')? Can i access the value as inprocess_element() in the ctx for example?

Thank you!

On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Shuiqiang, meneldor,

1. In fact, there is a problem with using Python `Named Row` as the return value of user-defined function in PyFlink.

When serializing a Row data, the serializer of each field is consistent with the order of the Row fields. But the field order of Python `Named Row` has been sorted by field, and it was designed to better compare Named Row and calculate hash values. So this can lead to serialization/deserialization errors(The correspondence between serializer and field is wrong). It is for performance considerations that serializers are not specified according to file name, but `Named Row` support can be achieved at the expense of a little performance for ease of use. For the current example, I suggest returning a list or a normal Row, instead of a Named Row.


2. In pyflink 1.12.0, the method signature of `on_timer` should be `def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector)`[1].  If you want to send data in `on_timer`, you can use `Collector.collect`. e.g.
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
out.collect(Row('a', 'b', 'c'))

3. >>> I am not sure if the timestamp field should be included in output_type_info as i did now.

If you return data with a time_stamp field, `output_type_info` needs to have `time_stamp` field. For example, the data returned in your example contains `time_stamp`, so your `output_type_info` needs to have the information of this field.



Best,
Xingbo

2021年1月18日 下午9:21,meneldor <[hidden email]> 写道:

Actually the output_type_info is ok, it was copy/paste typo. I changed the function to:
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], timestamp=ctx.timestamp())
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, timestamp=timestamp)
And the type to:
output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()])
I cant return the same data in on_timer() because there is no value parameter. Thats why i hardcoded device_ts. However the exception persists.
I am not sure if the timestamp field should be included in output_type_info as i did now.
Regards

On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor, 

Actually, the return type of the on_timer() must be the same as process_element(). It seems that the yield value of process_element() is missing the `timestamp` field.  And the `output_type_info` has four field names but with 5 field types. Could you align them?

Best,
Shuiqiang

Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

meneldor
Thank you Xingbo!

Do you plan to implement CoProcess functions too? Right now i cant find a convenient method to connect and merge two streams?

Regards

On Tue, Jan 19, 2021 at 4:16 AM Xingbo Huang <[hidden email]> wrote:
Hi meneldor,

1. Yes. Although release 1.12.1 has not been officially released, it is indeed available for download on PyPI.
In PyFlink 1.12.1, you only need to `yield` your output in `on_timer`.

2. Whenever an element comes, your `process_element` method will be invoked, so you can directly get the `value` parameter in `process_element`. The firing of the `on_timer` method depends on your registering timer, as you wrote in the example `ctx.timer_service().register_event_time_timer(current_watermark + 1500)`. You might need state access[1] which will be supported in release-1.13. At that time, you can get your state in `on_timer`, so as to conveniently control the output.


[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API.

Best,
Xingbo

meneldor <[hidden email]> 于2021年1月18日周一 下午10:44写道:
Thank you Xingbo

1. I will try to use normal list instead of named. Thanks!
2. There is a new 1.12.1 version of pyflink which is using process_element(self, value, ctx: 'KeyedProcessFunction.Context')

And what about the on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext')? Can i access the value as inprocess_element() in the ctx for example?

Thank you!

On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Shuiqiang, meneldor,

1. In fact, there is a problem with using Python `Named Row` as the return value of user-defined function in PyFlink.

When serializing a Row data, the serializer of each field is consistent with the order of the Row fields. But the field order of Python `Named Row` has been sorted by field, and it was designed to better compare Named Row and calculate hash values. So this can lead to serialization/deserialization errors(The correspondence between serializer and field is wrong). It is for performance considerations that serializers are not specified according to file name, but `Named Row` support can be achieved at the expense of a little performance for ease of use. For the current example, I suggest returning a list or a normal Row, instead of a Named Row.


2. In pyflink 1.12.0, the method signature of `on_timer` should be `def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector)`[1].  If you want to send data in `on_timer`, you can use `Collector.collect`. e.g.
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
out.collect(Row('a', 'b', 'c'))

3. >>> I am not sure if the timestamp field should be included in output_type_info as i did now.

If you return data with a time_stamp field, `output_type_info` needs to have `time_stamp` field. For example, the data returned in your example contains `time_stamp`, so your `output_type_info` needs to have the information of this field.



Best,
Xingbo

2021年1月18日 下午9:21,meneldor <[hidden email]> 写道:

Actually the output_type_info is ok, it was copy/paste typo. I changed the function to:
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], timestamp=ctx.timestamp())
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, timestamp=timestamp)
And the type to:
output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()])
I cant return the same data in on_timer() because there is no value parameter. Thats why i hardcoded device_ts. However the exception persists.
I am not sure if the timestamp field should be included in output_type_info as i did now.
Regards

On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor, 

Actually, the return type of the on_timer() must be the same as process_element(). It seems that the yield value of process_element() is missing the `timestamp` field.  And the `output_type_info` has four field names but with 5 field types. Could you align them?

Best,
Shuiqiang

Reply | Threaded
Open this post in threaded view
|

Re: Declaring and using ROW data type in PyFlink DataStream

Xingbo Huang
Hi meneldor,

Yes. As the first version of Python DataStream, release-1.12 has not yet covered all scenarios. In release-1.13, we will extend the function of Python DataStream to cover most scenarios, and CoProcessFunction will obviously be in it.

Best,
Xingbo

meneldor <[hidden email]> 于2021年1月19日周二 下午4:52写道:
Thank you Xingbo!

Do you plan to implement CoProcess functions too? Right now i cant find a convenient method to connect and merge two streams?

Regards

On Tue, Jan 19, 2021 at 4:16 AM Xingbo Huang <[hidden email]> wrote:
Hi meneldor,

1. Yes. Although release 1.12.1 has not been officially released, it is indeed available for download on PyPI.
In PyFlink 1.12.1, you only need to `yield` your output in `on_timer`.

2. Whenever an element comes, your `process_element` method will be invoked, so you can directly get the `value` parameter in `process_element`. The firing of the `on_timer` method depends on your registering timer, as you wrote in the example `ctx.timer_service().register_event_time_timer(current_watermark + 1500)`. You might need state access[1] which will be supported in release-1.13. At that time, you can get your state in `on_timer`, so as to conveniently control the output.


[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API.

Best,
Xingbo

meneldor <[hidden email]> 于2021年1月18日周一 下午10:44写道:
Thank you Xingbo

1. I will try to use normal list instead of named. Thanks!
2. There is a new 1.12.1 version of pyflink which is using process_element(self, value, ctx: 'KeyedProcessFunction.Context')

And what about the on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext')? Can i access the value as inprocess_element() in the ctx for example?

Thank you!

On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Shuiqiang, meneldor,

1. In fact, there is a problem with using Python `Named Row` as the return value of user-defined function in PyFlink.

When serializing a Row data, the serializer of each field is consistent with the order of the Row fields. But the field order of Python `Named Row` has been sorted by field, and it was designed to better compare Named Row and calculate hash values. So this can lead to serialization/deserialization errors(The correspondence between serializer and field is wrong). It is for performance considerations that serializers are not specified according to file name, but `Named Row` support can be achieved at the expense of a little performance for ease of use. For the current example, I suggest returning a list or a normal Row, instead of a Named Row.


2. In pyflink 1.12.0, the method signature of `on_timer` should be `def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector)`[1].  If you want to send data in `on_timer`, you can use `Collector.collect`. e.g.
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
out.collect(Row('a', 'b', 'c'))

3. >>> I am not sure if the timestamp field should be included in output_type_info as i did now.

If you return data with a time_stamp field, `output_type_info` needs to have `time_stamp` field. For example, the data returned in your example contains `time_stamp`, so your `output_type_info` needs to have the information of this field.



Best,
Xingbo

2021年1月18日 下午9:21,meneldor <[hidden email]> 写道:

Actually the output_type_info is ok, it was copy/paste typo. I changed the function to:
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], timestamp=ctx.timestamp())
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, timestamp=timestamp)
And the type to:
output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()])
I cant return the same data in on_timer() because there is no value parameter. Thats why i hardcoded device_ts. However the exception persists.
I am not sure if the timestamp field should be included in output_type_info as i did now.
Regards

On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <[hidden email]> wrote:
Hi meneldor, 

Actually, the return type of the on_timer() must be the same as process_element(). It seems that the yield value of process_element() is missing the `timestamp` field.  And the `output_type_info` has four field names but with 5 field types. Could you align them?

Best,
Shuiqiang