Flink: Not able to sink a stream into csv

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink: Not able to sink a stream into csv

Yik San Chan
The question is cross posted on Stack Overflow https://stackoverflow.com/questions/67195207/flink-not-able-to-sink-a-stream-into-csv.

I am trying to sink a stream into filesystem in csv format using PyFlink, however it does not work.

```python
# stream_to_csv.py
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
""")

table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
```

To run the script:

```
$ python stream_to_csv.py
```

I expect records go to /tmp/output folder, however that doesn't happen.

```
$ ~ ls /tmp/output
(nothing shown here)
```

Anything I miss?

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: Flink: Not able to sink a stream into csv

Dian Fu
Hi Yik San,

You need to set the rolling policy for filesystem. You could refer to the Rolling Policy section [1] for more details.

Actually there are output and you could execute command `ls -la /tmp/output/`, then you will see several files named “.part-xxx”.

For your job, you need to set the `execution.checkpointing.interval` in the configuration and `sink.rolling-policy.rollover-interval` in the property of Filesystem connector.


The job will look like the following:
```
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")

table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output',
        'sink.rolling-policy.rollover-interval' = '10s'
    )
""")

table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
```

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#rolling-policy

2021年4月21日 下午7:44,Yik San Chan <[hidden email]> 写道:

The question is cross posted on Stack Overflow https://stackoverflow.com/questions/67195207/flink-not-able-to-sink-a-stream-into-csv.

I am trying to sink a stream into filesystem in csv format using PyFlink, however it does not work.

```python
# stream_to_csv.py
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
""")

table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
```

To run the script:

```
$ python stream_to_csv.py
```

I expect records go to /tmp/output folder, however that doesn't happen.

```
$ ~ ls /tmp/output
(nothing shown here)
```

Anything I miss?

Best,
Yik San

Reply | Threaded
Open this post in threaded view
|

Re: Flink: Not able to sink a stream into csv

Yik San Chan
Hi Dian,

Thanks for your help, again!

Best,
Yik San

On Wed, Apr 21, 2021 at 8:39 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

You need to set the rolling policy for filesystem. You could refer to the Rolling Policy section [1] for more details.

Actually there are output and you could execute command `ls -la /tmp/output/`, then you will see several files named “.part-xxx”.

For your job, you need to set the `execution.checkpointing.interval` in the configuration and `sink.rolling-policy.rollover-interval` in the property of Filesystem connector.


The job will look like the following:
```
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")

table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output',
        'sink.rolling-policy.rollover-interval' = '10s'
    )
""")

table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
```

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#rolling-policy

2021年4月21日 下午7:44,Yik San Chan <[hidden email]> 写道:

The question is cross posted on Stack Overflow https://stackoverflow.com/questions/67195207/flink-not-able-to-sink-a-stream-into-csv.

I am trying to sink a stream into filesystem in csv format using PyFlink, however it does not work.

```python
# stream_to_csv.py
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
""")

table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
```

To run the script:

```
$ python stream_to_csv.py
```

I expect records go to /tmp/output folder, however that doesn't happen.

```
$ ~ ls /tmp/output
(nothing shown here)
```

Anything I miss?

Best,
Yik San