The question is cross-posted on StackOverflow https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files.
I want to use Flink to read from an input file, do some aggregation, and write the result to an output file. The job is in batch mode. See `wordcount.py` below: ```python from pyflink.table import EnvironmentSettings, BatchTableEnvironment # https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html env_settings = EnvironmentSettings.new_instance().in_batch_mode().build() table_env = BatchTableEnvironment.create(environment_settings=env_settings) my_source_ddl = """ create table mySource ( word VARCHAR ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/tmp/input' ) """ my_sink_ddl = """ create table mySink ( word VARCHAR, `count` BIGINT ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/tmp/output' ) """ transform_dml = """ INSERT INTO mySink SELECT word, COUNT(1) FROM mySource GROUP BY word """ table_env.execute_sql(my_source_ddl) table_env.execute_sql(my_sink_ddl) table_env.execute_sql(transform_dml).wait() # before run: echo -e "flink\npyflink\nflink" > /tmp/input # after run: cat /tmp/output ``` Before running `python wordcount.py`, I run `echo -e "flink\npyflink\nflink" > /tmp/input` to make sure data exist in /tmp/input. However, after the run, there are two files in /tmp/output: ``` > ls /tmp/output part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0 > cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 pyflink,1 > cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0 flink,2 ``` While I expect a single file /tmp/output with content: ``` pyflink,1 flink,2 ``` Actually, I got the above python program by adjusting the below that produces the single file /tmp/output. ```python from pyflink.dataset import ExecutionEnvironment from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.expressions import lit # https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html exec_env = ExecutionEnvironment.get_execution_environment() exec_env.set_parallelism(1) t_config = TableConfig() t_env = BatchTableEnvironment.create(exec_env, t_config) t_env.connect(FileSystem().path('/tmp/input')) \ .with_format(OldCsv() .field('word', DataTypes.STRING())) \ .with_schema(Schema() .field('word', DataTypes.STRING())) \ .create_temporary_table('mySource') t_env.connect(FileSystem().path('/tmp/output')) \ .with_format(OldCsv() .field_delimiter('\t') .field('word', DataTypes.STRING()) .field('count', DataTypes.BIGINT())) \ .with_schema(Schema() .field('word', DataTypes.STRING()) .field('count', DataTypes.BIGINT())) \ .create_temporary_table('mySink') tab = t_env.from_path('mySource') tab.group_by(tab.word) \ .select(tab.word, lit(1).count) \ .execute_insert('mySink').wait() ``` Running this version will generate a /tmp/output. Note it doesn't come with comma delimiter. ``` > cat /tmp/output flink 2 pyflink 1 ``` Any idea why? Thanks! Best, Yik San Chan |
The first time you ran it without having specified the parallelism, and so you got the default parallelism -- which is greater than 1 (probably 4 or 8, depending on how many cores your computer has). Flink is designed to be scalable, and to achieve that, parallel instances of an operator, such as a sink, are decoupled from one another. Imagine, for example, a large cluster with 100s or 1000s of nodes. For this to work well, each instance needs to write to its own file. The commas were changed to tabs because you specified Regards, David On Mon, Mar 15, 2021 at 9:49 AM Yik San Chan <[hidden email]> wrote:
|
Thank you, it works. Best, Yik San Chan On Mon, Mar 15, 2021 at 5:30 PM David Anderson <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |