Why does Flink FileSystem sink splits into multiple files

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Why does Flink FileSystem sink splits into multiple files

Yik San Chan
The question is cross-posted on StackOverflow https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files.

I want to use Flink to read from an input file, do some aggregation, and write the result to an output file. The job is in batch mode. See `wordcount.py` below:

```python
from pyflink.table import EnvironmentSettings, BatchTableEnvironment

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

transform_dml = """
INSERT INTO mySink
SELECT word, COUNT(1) FROM mySource GROUP BY word
"""

table_env.execute_sql(my_source_ddl)
table_env.execute_sql(my_sink_ddl)
table_env.execute_sql(transform_dml).wait()

# before run: echo -e  "flink\npyflink\nflink" > /tmp/input
# after run: cat /tmp/output
```

Before running `python wordcount.py`, I run `echo -e  "flink\npyflink\nflink" > /tmp/input` to make sure data exist in /tmp/input. However, after the run, there are two files in /tmp/output:

```
> ls /tmp/output
part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
pyflink,1
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
flink,2
```

While I expect a single file /tmp/output with content:

```
pyflink,1
flink,2
```

Actually, I got the above python program by adjusting the below that produces the single file /tmp/output.

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

Running this version will generate a /tmp/output. Note it doesn't come with comma delimiter.

```
> cat /tmp/output
flink 2
pyflink 1
```

Any idea why? Thanks!

Best,
Yik San Chan
Reply | Threaded
Open this post in threaded view
|

Re: Why does Flink FileSystem sink splits into multiple files

David Anderson-4

The first time you ran it without having specified the parallelism, and so you got the default parallelism -- which is greater than 1 (probably 4 or 8, depending on how many cores your computer has).

Flink is designed to be scalable, and to achieve that, parallel instances of an operator, such as a sink, are decoupled from one another. Imagine, for example, a large cluster with 100s or 1000s of nodes. For this to work well, each instance needs to write to its own file.

The commas were changed to tabs because you specified .field_delimiter('\t').


Regards,

David


On Mon, Mar 15, 2021 at 9:49 AM Yik San Chan <[hidden email]> wrote:
The question is cross-posted on StackOverflow https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files.

I want to use Flink to read from an input file, do some aggregation, and write the result to an output file. The job is in batch mode. See `wordcount.py` below:

```python
from pyflink.table import EnvironmentSettings, BatchTableEnvironment

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

transform_dml = """
INSERT INTO mySink
SELECT word, COUNT(1) FROM mySource GROUP BY word
"""

table_env.execute_sql(my_source_ddl)
table_env.execute_sql(my_sink_ddl)
table_env.execute_sql(transform_dml).wait()

# before run: echo -e  "flink\npyflink\nflink" > /tmp/input
# after run: cat /tmp/output
```

Before running `python wordcount.py`, I run `echo -e  "flink\npyflink\nflink" > /tmp/input` to make sure data exist in /tmp/input. However, after the run, there are two files in /tmp/output:

```
> ls /tmp/output
part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
pyflink,1
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
flink,2
```

While I expect a single file /tmp/output with content:

```
pyflink,1
flink,2
```

Actually, I got the above python program by adjusting the below that produces the single file /tmp/output.

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

Running this version will generate a /tmp/output. Note it doesn't come with comma delimiter.

```
> cat /tmp/output
flink 2
pyflink 1
```

Any idea why? Thanks!

Best,
Yik San Chan
Reply | Threaded
Open this post in threaded view
|

Re: Why does Flink FileSystem sink splits into multiple files

Yik San Chan
Thank you, it works.

Best,
Yik San Chan

On Mon, Mar 15, 2021 at 5:30 PM David Anderson <[hidden email]> wrote:

The first time you ran it without having specified the parallelism, and so you got the default parallelism -- which is greater than 1 (probably 4 or 8, depending on how many cores your computer has).

Flink is designed to be scalable, and to achieve that, parallel instances of an operator, such as a sink, are decoupled from one another. Imagine, for example, a large cluster with 100s or 1000s of nodes. For this to work well, each instance needs to write to its own file.

The commas were changed to tabs because you specified .field_delimiter('\t').


Regards,

David


On Mon, Mar 15, 2021 at 9:49 AM Yik San Chan <[hidden email]> wrote:
The question is cross-posted on StackOverflow https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files.

I want to use Flink to read from an input file, do some aggregation, and write the result to an output file. The job is in batch mode. See `wordcount.py` below:

```python
from pyflink.table import EnvironmentSettings, BatchTableEnvironment

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

transform_dml = """
INSERT INTO mySink
SELECT word, COUNT(1) FROM mySource GROUP BY word
"""

table_env.execute_sql(my_source_ddl)
table_env.execute_sql(my_sink_ddl)
table_env.execute_sql(transform_dml).wait()

# before run: echo -e  "flink\npyflink\nflink" > /tmp/input
# after run: cat /tmp/output
```

Before running `python wordcount.py`, I run `echo -e  "flink\npyflink\nflink" > /tmp/input` to make sure data exist in /tmp/input. However, after the run, there are two files in /tmp/output:

```
> ls /tmp/output
part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
pyflink,1
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
flink,2
```

While I expect a single file /tmp/output with content:

```
pyflink,1
flink,2
```

Actually, I got the above python program by adjusting the below that produces the single file /tmp/output.

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

Running this version will generate a /tmp/output. Note it doesn't come with comma delimiter.

```
> cat /tmp/output
flink 2
pyflink 1
```

Any idea why? Thanks!

Best,
Yik San Chan