PyFlink DataStream union type mismatch

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink DataStream union type mismatch

Wouter Zorgdrager
Dear all,

I'm having trouble unifying two data streams using the union operator in PyFlink. My code basically looks like this:

init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
   .key_by(lambda x: x[0], Types.STRING())
)

stateful_operator_stream = (operator_stream
    .filter(lambda r: r[0] is not None)
    .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()]))
    .key_by(lambda x: x[0],Types.STRING())
)

print(init_stream)
print(init_stream.get_type())

print(stateful_operator_stream.get_type())
print(stateful_operator_stream)

final_operator_stream = init_stream
    .union(stateful_operator_stream)
    .process(stateful_operator)


In short, I have a datastream (operator_stream) of type Tuple[str, Event] which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
When calling the union operator, I get an error which shows a type mismatch between both streams:
py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
: java.lang.IllegalArgumentException: Cannot union streams of different types: Java Tuple2<String, PickledByteArrayTypeInfo> and Row(f0: String, f1: Java Tuple2<String, PickledByteArrayTypeInfo>)
at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834) However, when I print the types of both datastreams they seem similar:

<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320>
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98> Any thoughts? Thanks in advance!

Regards,
Wouter
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink DataStream union type mismatch

Dian Fu
Hi Wouter,

1) For the exception, it seems a bug. I have filed a ticket for it: https://issues.apache.org/jira/browse/FLINK-22733

2) Regarding to your requirements, I guess you should do it as following:
```
init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
)

stateful_operator_stream = (operator_stream
    .filter(lambda r: r[0] is not None)
    .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()]))
)


init_stream.union(stateful_operator_stream).key_by(lambda x: x[0],Types.STRING())

```

The reason is that `union` will turns `KeyedStream` into `DataStream` and you could not perform stateful operations on `DataStream` any more.

Regards,
Dian

2021年5月21日 上午12:38,Wouter Zorgdrager <[hidden email]> 写道:

Dear all,

I'm having trouble unifying two data streams using the union operator in PyFlink. My code basically looks like this:

init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
   .key_by(lambda x: x[0], Types.STRING())
)

stateful_operator_stream = (operator_stream
    .filter(lambda r: r[0] is not None)
    .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()]))
    .key_by(lambda x: x[0],Types.STRING())
)

print(init_stream)
print(init_stream.get_type())

print(stateful_operator_stream.get_type())
print(stateful_operator_stream)

final_operator_stream = init_stream
    .union(stateful_operator_stream)
    .process(stateful_operator)


In short, I have a datastream (operator_stream) of type Tuple[str, Event] which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
When calling the union operator, I get an error which shows a type mismatch between both streams:
py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
: java.lang.IllegalArgumentException: Cannot union streams of different types: Java Tuple2<String, PickledByteArrayTypeInfo> and Row(f0: String, f1: Java Tuple2<String, PickledByteArrayTypeInfo>)
at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834) However, when I print the types of both datastreams they seem similar:

<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320>
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98> Any thoughts? Thanks in advance!

Regards,
Wouter

Reply | Threaded
Open this post in threaded view
|

Re: PyFlink DataStream union type mismatch

Wouter Zorgdrager
Hi Dian, all,

Thanks, that indeed solved my problem. I have two more questions, I'm not sure if it is better practice to send a new email to the mailing list or to re-use a thread:

1. I noticed very high latency (multiple seconds per message) for a job with multiple operators and very low throughput. I bet because messages are bundled until a size threshold or time threshold is met (and in a low throughput scenario, only the time threshold is triggered).
This is also the idea I get when reading the configuration page [1]. However, these configuration values seem to be targeted at the TableAPI and it is unclear to me how to configure this for the Datastream API. To be clear, this is in PyFlink.

2. I'm using the JVM Kafka Consumer and Producer for my Python job. Therefore I had to add the flink-connector-sql-kafka jar to my Flink environment. I did this by downloading the jar file from Maven and putting it under 'venv/pyflink/lib'. Is there any easier way? I'm not particularly a fan of manually changing my venv. 
I tried to use stream_execution_environment.add_jars but that was unsuccessful, I still got a ClassNotFoundException.  

Hope you can help. As always, thanks a lot!

Regards,
Wouter



On Fri, 21 May 2021 at 05:25, Dian Fu <[hidden email]> wrote:
Hi Wouter,

1) For the exception, it seems a bug. I have filed a ticket for it: https://issues.apache.org/jira/browse/FLINK-22733

2) Regarding to your requirements, I guess you should do it as following:
```
init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
)

stateful_operator_stream = (operator_stream
    .filter(lambda r: r[0] is not None)
    .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()]))
)


init_stream.union(stateful_operator_stream).key_by(lambda x: x[0],Types.STRING())

```

The reason is that `union` will turns `KeyedStream` into `DataStream` and you could not perform stateful operations on `DataStream` any more.

Regards,
Dian

2021年5月21日 上午12:38,Wouter Zorgdrager <[hidden email]> 写道:

Dear all,

I'm having trouble unifying two data streams using the union operator in PyFlink. My code basically looks like this:

init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
   .key_by(lambda x: x[0], Types.STRING())
)

stateful_operator_stream = (operator_stream
    .filter(lambda r: r[0] is not None)
    .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()]))
    .key_by(lambda x: x[0],Types.STRING())
)

print(init_stream)
print(init_stream.get_type())

print(stateful_operator_stream.get_type())
print(stateful_operator_stream)

final_operator_stream = init_stream
    .union(stateful_operator_stream)
    .process(stateful_operator)


In short, I have a datastream (operator_stream) of type Tuple[str, Event] which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
When calling the union operator, I get an error which shows a type mismatch between both streams:
py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
: java.lang.IllegalArgumentException: Cannot union streams of different types: Java Tuple2<String, PickledByteArrayTypeInfo> and Row(f0: String, f1: Java Tuple2<String, PickledByteArrayTypeInfo>)
at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834) However, when I print the types of both datastreams they seem similar:

<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320>
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98> Any thoughts? Thanks in advance!

Regards,
Wouter

Reply | Threaded
Open this post in threaded view
|

Re: PyFlink DataStream union type mismatch

Dian Fu
Hi Wouter,

1. These configurations work for both Python Table API and DataStream API. It seems targeted for Python Table API as only Python Table API is supported when adding the documentation. I will followup to improve the paragraph.

2. The recommended way is using `stream_execution_environment.add_jars`. Could you share some code snippets? 

Regards,
Dian

2021年5月23日 下午7:26,Wouter Zorgdrager <[hidden email]> 写道:

Hi Dian, all,

Thanks, that indeed solved my problem. I have two more questions, I'm not sure if it is better practice to send a new email to the mailing list or to re-use a thread:

1. I noticed very high latency (multiple seconds per message) for a job with multiple operators and very low throughput. I bet because messages are bundled until a size threshold or time threshold is met (and in a low throughput scenario, only the time threshold is triggered).
This is also the idea I get when reading the configuration page [1]. However, these configuration values seem to be targeted at the TableAPI and it is unclear to me how to configure this for the Datastream API. To be clear, this is in PyFlink.

2. I'm using the JVM Kafka Consumer and Producer for my Python job. Therefore I had to add the flink-connector-sql-kafka jar to my Flink environment. I did this by downloading the jar file from Maven and putting it under 'venv/pyflink/lib'. Is there any easier way? I'm not particularly a fan of manually changing my venv. 
I tried to use stream_execution_environment.add_jars but that was unsuccessful, I still got a ClassNotFoundException.  

Hope you can help. As always, thanks a lot!

Regards,
Wouter



On Fri, 21 May 2021 at 05:25, Dian Fu <[hidden email]> wrote:
Hi Wouter,

1) For the exception, it seems a bug. I have filed a ticket for it: https://issues.apache.org/jira/browse/FLINK-22733

2) Regarding to your requirements, I guess you should do it as following:
```
init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
)

stateful_operator_stream = (operator_stream
    .filter(lambda r: r[0] is not None)
    .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()]))
)


init_stream.union(stateful_operator_stream).key_by(lambda x: x[0],Types.STRING())

```

The reason is that `union` will turns `KeyedStream` into `DataStream` and you could not perform stateful operations on `DataStream` any more.

Regards,
Dian

2021年5月21日 上午12:38,Wouter Zorgdrager <[hidden email]> 写道:

Dear all,

I'm having trouble unifying two data streams using the union operator in PyFlink. My code basically looks like this:

init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
   .key_by(lambda x: x[0], Types.STRING())
)

stateful_operator_stream = (operator_stream
    .filter(lambda r: r[0] is not None)
    .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()]))
    .key_by(lambda x: x[0],Types.STRING())
)

print(init_stream)
print(init_stream.get_type())

print(stateful_operator_stream.get_type())
print(stateful_operator_stream)

final_operator_stream = init_stream
    .union(stateful_operator_stream)
    .process(stateful_operator)


In short, I have a datastream (operator_stream) of type Tuple[str, Event] which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
When calling the union operator, I get an error which shows a type mismatch between both streams:
py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
: java.lang.IllegalArgumentException: Cannot union streams of different types: Java Tuple2<String, PickledByteArrayTypeInfo> and Row(f0: String, f1: Java Tuple2<String, PickledByteArrayTypeInfo>)
at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834) However, when I print the types of both datastreams they seem similar:

<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320>
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98> Any thoughts? Thanks in advance!

Regards,
Wouter