I am trying to get a simple streaming job running in pyFlink and understand the new 1.11 API. I just want to read from and write to kafka topics. Previously I was using t_env.execute("jobname"), register_table_source() and register_table_sink() but in 1.11 all 3 were deprecated and replaced by execute_sql() in the deprecation warning. exec_env = StreamExecutionEnvironment.get_execution_environment() ddl_source = f""" This gives me the error : : java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. I am aware this is lazily evaluated, so is there some equivalent SQL statement for t_env.execute() that I should be calling? Thanks, Manas |
Hi Manas, I tested your code, but there are no errors. Because execute_sql is an asynchronous method, you need to await through TableResult, you can try the following code: from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.table import StreamTableEnvironment, EnvironmentSettings def test(): exec_env = StreamExecutionEnvironment.get_execution_environment() exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) t_env = StreamTableEnvironment.create(exec_env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() ) INPUT_TABLE = "test" INPUT_TOPIC = "test" LOCAL_KAFKA = "localhost:2181" OUTPUT_TABLE = "test_output" OUTPUT_TOPIC = "test_output" ddl_source = f""" CREATE TABLE {INPUT_TABLE} ( `monitorId` VARCHAR, `time_st` TIMESTAMP(3), WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND, `data` DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = '{INPUT_TOPIC}', 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', 'properties.group.id' = 'myGroup', 'format' = 'json' ) """ ddl_sink = f""" CREATE TABLE {OUTPUT_TABLE} ( `monitorId` VARCHAR, `max` DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = '{OUTPUT_TOPIC}', 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', 'format' = 'json' ) """ t_env.execute_sql(ddl_source) t_env.execute_sql(ddl_sink) result = t_env.execute_sql(f""" INSERT INTO {OUTPUT_TABLE} SELECT monitorId, data FROM {INPUT_TABLE} """) result.get_job_client().get_job_execution_result().result() if __name__ == '__main__': test() Best, Xingbo Manas Kale <[hidden email]> 于2020年7月14日周二 下午3:31写道:
Thank you for the quick reply Xingbo! Is there some documented webpage example that I can refer to in the future for the latest pyFlink 1.11 API? I couldn't find anything related to awaiting asynchronous results. Thanks, Manas On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas, I have created a issue[1] to add related doc [1] https://issues.apache.org/jira/browse/FLINK-18598 Best, Xingbo Manas Kale <[hidden email]> 于2020年7月14日周二 下午4:15写道:
Thank you Xingbo, this will certainly help! On Wed, Jul 15, 2020 at 7:39 AM Xingbo Huang <[hidden email]> wrote:
Free forum by Nabble | Edit this page |