pyFlink 1.11 streaming job example

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

pyFlink 1.11 streaming job example

Manas Kale
Hi,
I am trying to get a simple streaming job running in pyFlink and understand the new 1.11 API. I just want to read from and write to kafka topics.
Previously I was using t_env.execute("jobname"), register_table_source() and register_table_sink() but in 1.11 all 3 were deprecated and replaced by execute_sql() in the deprecation warning.
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
ddl_source = f"""               
CREATE TABLE {INPUT_TABLE} (
`monitorId` VARCHAR,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'properties.group.id' = 'myGroup',
'format' = 'json'
)
"""

ddl_sink = f"""
CREATE TABLE {OUTPUT_TABLE} (
`monitorId` VARCHAR,
`max` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT monitorId, data
FROM {INPUT_TABLE}
""")

This gives me the error : 
: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.

I am aware this is lazily evaluated, so is there some equivalent SQL statement for t_env.execute() that I should be calling? 

Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: pyFlink 1.11 streaming job example

Xingbo Huang
Hi Manas,


I tested your code, but there are no errors. Because execute_sql is an asynchronous method, you need to await through TableResult, you can try the following code:


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings


def test():
    exec_env = StreamExecutionEnvironment.get_execution_environment()
    exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    t_env = StreamTableEnvironment.create(exec_env,
                                          environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
                                          )

    INPUT_TABLE = "test"
    INPUT_TOPIC = "test"
    LOCAL_KAFKA = "localhost:2181"
    OUTPUT_TABLE = "test_output"
    OUTPUT_TOPIC = "test_output"
    ddl_source = f"""              
        CREATE TABLE {INPUT_TABLE} (
            `monitorId` VARCHAR,
            `time_st` TIMESTAMP(3),
            WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
            `data` DOUBLE
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{INPUT_TOPIC}',
            'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
            'properties.group.id' = 'myGroup',
            'format' = 'json'
        )
    """

    ddl_sink = f"""
        CREATE TABLE {OUTPUT_TABLE} (
            `monitorId` VARCHAR,
            `max` DOUBLE
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{OUTPUT_TOPIC}',
            'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
            'format' = 'json'
        )
    """
    t_env.execute_sql(ddl_source)
    t_env.execute_sql(ddl_sink)

    result = t_env.execute_sql(f"""
        INSERT INTO {OUTPUT_TABLE}
        SELECT monitorId, data
        FROM {INPUT_TABLE}
    """)
    result.get_job_client().get_job_execution_result().result()


if __name__ == '__main__':
    test()


Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月14日周二 下午3:31写道:
Hi,
I am trying to get a simple streaming job running in pyFlink and understand the new 1.11 API. I just want to read from and write to kafka topics.
Previously I was using t_env.execute("jobname"), register_table_source() and register_table_sink() but in 1.11 all 3 were deprecated and replaced by execute_sql() in the deprecation warning.
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
ddl_source = f"""               
CREATE TABLE {INPUT_TABLE} (
`monitorId` VARCHAR,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'properties.group.id' = 'myGroup',
'format' = 'json'
)
"""

ddl_sink = f"""
CREATE TABLE {OUTPUT_TABLE} (
`monitorId` VARCHAR,
`max` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT monitorId, data
FROM {INPUT_TABLE}
""")

This gives me the error : 
: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.

I am aware this is lazily evaluated, so is there some equivalent SQL statement for t_env.execute() that I should be calling? 

Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: pyFlink 1.11 streaming job example

Manas Kale
Thank you for the quick reply Xingbo!
 Is there some documented webpage example that I can refer to in the future for the latest pyFlink 1.11 API? I couldn't find anything related to awaiting asynchronous results.

Thanks,
Manas

On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,


I tested your code, but there are no errors. Because execute_sql is an asynchronous method, you need to await through TableResult, you can try the following code:


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings


def test():
    exec_env = StreamExecutionEnvironment.get_execution_environment()
    exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    t_env = StreamTableEnvironment.create(exec_env,
                                          environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
                                          )

    INPUT_TABLE = "test"
    INPUT_TOPIC = "test"
    LOCAL_KAFKA = "localhost:2181"
    OUTPUT_TABLE = "test_output"
    OUTPUT_TOPIC = "test_output"
    ddl_source = f"""              
        CREATE TABLE {INPUT_TABLE} (
            `monitorId` VARCHAR,
            `time_st` TIMESTAMP(3),
            WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
            `data` DOUBLE
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{INPUT_TOPIC}',
            'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
            'properties.group.id' = 'myGroup',
            'format' = 'json'
        )
    """

    ddl_sink = f"""
        CREATE TABLE {OUTPUT_TABLE} (
            `monitorId` VARCHAR,
            `max` DOUBLE
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{OUTPUT_TOPIC}',
            'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
            'format' = 'json'
        )
    """
    t_env.execute_sql(ddl_source)
    t_env.execute_sql(ddl_sink)

    result = t_env.execute_sql(f"""
        INSERT INTO {OUTPUT_TABLE}
        SELECT monitorId, data
        FROM {INPUT_TABLE}
    """)
    result.get_job_client().get_job_execution_result().result()


if __name__ == '__main__':
    test()


Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月14日周二 下午3:31写道:
Hi,
I am trying to get a simple streaming job running in pyFlink and understand the new 1.11 API. I just want to read from and write to kafka topics.
Previously I was using t_env.execute("jobname"), register_table_source() and register_table_sink() but in 1.11 all 3 were deprecated and replaced by execute_sql() in the deprecation warning.
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
ddl_source = f"""               
CREATE TABLE {INPUT_TABLE} (
`monitorId` VARCHAR,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'properties.group.id' = 'myGroup',
'format' = 'json'
)
"""

ddl_sink = f"""
CREATE TABLE {OUTPUT_TABLE} (
`monitorId` VARCHAR,
`max` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT monitorId, data
FROM {INPUT_TABLE}
""")

This gives me the error : 
: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.

I am aware this is lazily evaluated, so is there some equivalent SQL statement for t_env.execute() that I should be calling? 

Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: pyFlink 1.11 streaming job example

Xingbo Huang
Hi Manas,

I have created a issue[1] to add related doc

[1] https://issues.apache.org/jira/browse/FLINK-18598

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月14日周二 下午4:15写道:
Thank you for the quick reply Xingbo!
 Is there some documented webpage example that I can refer to in the future for the latest pyFlink 1.11 API? I couldn't find anything related to awaiting asynchronous results.

Thanks,
Manas

On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,


I tested your code, but there are no errors. Because execute_sql is an asynchronous method, you need to await through TableResult, you can try the following code:


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings


def test():
    exec_env = StreamExecutionEnvironment.get_execution_environment()
    exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    t_env = StreamTableEnvironment.create(exec_env,
                                          environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
                                          )

    INPUT_TABLE = "test"
    INPUT_TOPIC = "test"
    LOCAL_KAFKA = "localhost:2181"
    OUTPUT_TABLE = "test_output"
    OUTPUT_TOPIC = "test_output"
    ddl_source = f"""              
        CREATE TABLE {INPUT_TABLE} (
            `monitorId` VARCHAR,
            `time_st` TIMESTAMP(3),
            WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
            `data` DOUBLE
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{INPUT_TOPIC}',
            'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
            'properties.group.id' = 'myGroup',
            'format' = 'json'
        )
    """

    ddl_sink = f"""
        CREATE TABLE {OUTPUT_TABLE} (
            `monitorId` VARCHAR,
            `max` DOUBLE
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{OUTPUT_TOPIC}',
            'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
            'format' = 'json'
        )
    """
    t_env.execute_sql(ddl_source)
    t_env.execute_sql(ddl_sink)

    result = t_env.execute_sql(f"""
        INSERT INTO {OUTPUT_TABLE}
        SELECT monitorId, data
        FROM {INPUT_TABLE}
    """)
    result.get_job_client().get_job_execution_result().result()


if __name__ == '__main__':
    test()


Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月14日周二 下午3:31写道:
Hi,
I am trying to get a simple streaming job running in pyFlink and understand the new 1.11 API. I just want to read from and write to kafka topics.
Previously I was using t_env.execute("jobname"), register_table_source() and register_table_sink() but in 1.11 all 3 were deprecated and replaced by execute_sql() in the deprecation warning.
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
ddl_source = f"""               
CREATE TABLE {INPUT_TABLE} (
`monitorId` VARCHAR,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'properties.group.id' = 'myGroup',
'format' = 'json'
)
"""

ddl_sink = f"""
CREATE TABLE {OUTPUT_TABLE} (
`monitorId` VARCHAR,
`max` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT monitorId, data
FROM {INPUT_TABLE}
""")

This gives me the error : 
: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.

I am aware this is lazily evaluated, so is there some equivalent SQL statement for t_env.execute() that I should be calling? 

Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: pyFlink 1.11 streaming job example

Manas Kale
Thank you Xingbo, this will certainly help!

On Wed, Jul 15, 2020 at 7:39 AM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

I have created a issue[1] to add related doc

[1] https://issues.apache.org/jira/browse/FLINK-18598

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月14日周二 下午4:15写道:
Thank you for the quick reply Xingbo!
 Is there some documented webpage example that I can refer to in the future for the latest pyFlink 1.11 API? I couldn't find anything related to awaiting asynchronous results.

Thanks,
Manas

On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,


I tested your code, but there are no errors. Because execute_sql is an asynchronous method, you need to await through TableResult, you can try the following code:


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings


def test():
    exec_env = StreamExecutionEnvironment.get_execution_environment()
    exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    t_env = StreamTableEnvironment.create(exec_env,
                                          environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
                                          )

    INPUT_TABLE = "test"
    INPUT_TOPIC = "test"
    LOCAL_KAFKA = "localhost:2181"
    OUTPUT_TABLE = "test_output"
    OUTPUT_TOPIC = "test_output"
    ddl_source = f"""              
        CREATE TABLE {INPUT_TABLE} (
            `monitorId` VARCHAR,
            `time_st` TIMESTAMP(3),
            WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
            `data` DOUBLE
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{INPUT_TOPIC}',
            'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
            'properties.group.id' = 'myGroup',
            'format' = 'json'
        )
    """

    ddl_sink = f"""
        CREATE TABLE {OUTPUT_TABLE} (
            `monitorId` VARCHAR,
            `max` DOUBLE
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{OUTPUT_TOPIC}',
            'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
            'format' = 'json'
        )
    """
    t_env.execute_sql(ddl_source)
    t_env.execute_sql(ddl_sink)

    result = t_env.execute_sql(f"""
        INSERT INTO {OUTPUT_TABLE}
        SELECT monitorId, data
        FROM {INPUT_TABLE}
    """)
    result.get_job_client().get_job_execution_result().result()


if __name__ == '__main__':
    test()


Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月14日周二 下午3:31写道:
Hi,
I am trying to get a simple streaming job running in pyFlink and understand the new 1.11 API. I just want to read from and write to kafka topics.
Previously I was using t_env.execute("jobname"), register_table_source() and register_table_sink() but in 1.11 all 3 were deprecated and replaced by execute_sql() in the deprecation warning.
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
ddl_source = f"""               
CREATE TABLE {INPUT_TABLE} (
`monitorId` VARCHAR,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'properties.group.id' = 'myGroup',
'format' = 'json'
)
"""

ddl_sink = f"""
CREATE TABLE {OUTPUT_TABLE} (
`monitorId` VARCHAR,
`max` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT monitorId, data
FROM {INPUT_TABLE}
""")

This gives me the error : 
: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.

I am aware this is lazily evaluated, so is there some equivalent SQL statement for t_env.execute() that I should be calling? 

Thanks,
Manas