The question is cross posted on Stack Overflow https://stackoverflow.com/questions/67195207/flink-not-able-to-sink-a-stream-into-csv.
I am trying to sink a stream into filesystem in csv format using PyFlink, however it does not work. ```python # stream_to_csv.py from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table_env.execute_sql(""" CREATE TABLE datagen ( id INT, data STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ) """) table_env.execute_sql(""" CREATE TABLE print ( id INT, data STRING ) WITH ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/tmp/output' ) """) table_env.execute_sql(""" INSERT INTO print SELECT id, data FROM datagen """).wait() ``` To run the script: ``` $ python stream_to_csv.py ``` I expect records go to /tmp/output folder, however that doesn't happen. ``` $ ~ ls /tmp/output (nothing shown here) ``` Anything I miss? Best, Yik San |
Hi Yik San,
You need to set the rolling policy for filesystem. You could refer to the Rolling Policy section [1] for more details. Actually there are output and you could execute command `ls -la /tmp/output/`, then you will see several files named “.part-xxx”. For your job, you need to set the `execution.checkpointing.interval` in the configuration and `sink.rolling-policy.rollover-interval` in the property of Filesystem connector. The job will look like the following: ``` from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s") table_env.execute_sql(""" CREATE TABLE datagen ( id INT, data STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ) """) table_env.execute_sql(""" CREATE TABLE print ( id INT, data STRING ) WITH ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/tmp/output', 'sink.rolling-policy.rollover-interval' = '10s' ) """) table_env.execute_sql(""" INSERT INTO print SELECT id, data FROM datagen """).wait() ``` [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#rolling-policy
|
Hi Dian, Thanks for your help, again! Best, Yik San On Wed, Apr 21, 2021 at 8:39 PM Dian Fu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |