Dear all,
I'm having trouble unifying two data streams using the union operator in PyFlink. My code basically looks like this: init_stream = (operator_stream .filter(lambda r: r[0] is None) .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) .key_by(lambda x: x[0], Types.STRING()) ) stateful_operator_stream = (operator_stream .filter(lambda r: r[0] is not None) .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])) .key_by(lambda x: x[0],Types.STRING()) ) print(init_stream) print(init_stream.get_type()) print(stateful_operator_stream.get_type()) print(stateful_operator_stream) final_operator_stream = init_stream .union(stateful_operator_stream) .process(stateful_operator) |
Hi Wouter,
1) For the exception, it seems a bug. I have filed a ticket for it: https://issues.apache.org/jira/browse/FLINK-22733 2) Regarding to your requirements, I guess you should do it as following: ``` init_stream = (operator_stream .filter(lambda r: r[0] is None) .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) ) stateful_operator_stream = (operator_stream .filter(lambda r: r[0] is not None) .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])) ) init_stream.union(stateful_operator_stream).key_by(lambda x: x[0],Types.STRING()) ``` The reason is that `union` will turns `KeyedStream` into `DataStream` and you could not perform stateful operations on `DataStream` any more. Regards, Dian
|
Hi Dian, all, Thanks, that indeed solved my problem. I have two more questions, I'm not sure if it is better practice to send a new email to the mailing list or to re-use a thread: 1. I noticed very high latency (multiple seconds per message) for a job with multiple operators and very low throughput. I bet because messages are bundled until a size threshold or time threshold is met (and in a low throughput scenario, only the time threshold is triggered). This is also the idea I get when reading the configuration page [1]. However, these configuration values seem to be targeted at the TableAPI and it is unclear to me how to configure this for the Datastream API. To be clear, this is in PyFlink. 2. I'm using the JVM Kafka Consumer and Producer for my Python job. Therefore I had to add the flink-connector-sql-kafka jar to my Flink environment. I did this by downloading the jar file from Maven and putting it under 'venv/pyflink/lib'. Is there any easier way? I'm not particularly a fan of manually changing my venv. I tried to use stream_execution_environment.add_jars but that was unsuccessful, I still got a ClassNotFoundException. Hope you can help. As always, thanks a lot! Regards, Wouter On Fri, 21 May 2021 at 05:25, Dian Fu <[hidden email]> wrote:
|
Hi Wouter,
1. These configurations work for both Python Table API and DataStream API. It seems targeted for Python Table API as only Python Table API is supported when adding the documentation. I will followup to improve the paragraph. 2. The recommended way is using `stream_execution_environment.add_jars`. Could you share some code snippets? Regards, Dian
|
Free forum by Nabble | Edit this page |