Flink 1.13 and CSV (batch) writing

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.13 and CSV (batch) writing

Flavio Pompermaier
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Flavio Pompermaier
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Till Rohrmann
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Flavio Pompermaier
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Kurt Young
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Kurt Young
My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <[hidden email]> wrote:
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Flavio Pompermaier
Thanks Kurt, now it works. However I can't find a way to skip the CSV header..before there was  "format.ignore-first-line" but now I can't find another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other parsing errors, otherwise I need to manually transofrm the header into a comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <[hidden email]> wrote:
My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <[hidden email]> wrote:
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Flavio Pompermaier
And another thing: in my csv I added ',bye' (to test null Long values) but I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is that the right way to solve this problem?

On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Thanks Kurt, now it works. However I can't find a way to skip the CSV header..before there was  "format.ignore-first-line" but now I can't find another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other parsing errors, otherwise I need to manually transofrm the header into a comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <[hidden email]> wrote:
My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <[hidden email]> wrote:
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Kurt Young
`format.ignore-first-line` is unfortunately a regression compared to the old one. 
I've created a ticket [1] to track this but according to current design, it seems not easy to do.

Regarding null values, I'm not sure if I understand the issue you had. What do you mean by
using ',bye' to test null Long values?


Best,
Kurt


On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <[hidden email]> wrote:
And another thing: in my csv I added ',bye' (to test null Long values) but I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is that the right way to solve this problem?

On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Thanks Kurt, now it works. However I can't find a way to skip the CSV header..before there was  "format.ignore-first-line" but now I can't find another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other parsing errors, otherwise I need to manually transofrm the header into a comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <[hidden email]> wrote:
My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <[hidden email]> wrote:
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Flavio Pompermaier
In my real CSV I have LONG columns that can contain null values. In that case I get a parse exception (and I would like to avoid to read it as a string).
The ',bye' is just the way you can test that in my example (add that line to the input csv).
If I use  'csv.null-literal' = '' it seems to work but, is it a workaround or it is the right solution?

Another big problem I'm having with the new APIs is that if I use 
    TableEnvironment tableEnv = TableEnvironment.create(envSettings);
then I can't convert a table to a datastream..I need to use 
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);
but in that case I can't use inBatchMode..

On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <[hidden email]> wrote:
`format.ignore-first-line` is unfortunately a regression compared to the old one. 
I've created a ticket [1] to track this but according to current design, it seems not easy to do.

Regarding null values, I'm not sure if I understand the issue you had. What do you mean by
using ',bye' to test null Long values?


Best,
Kurt


On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <[hidden email]> wrote:
And another thing: in my csv I added ',bye' (to test null Long values) but I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is that the right way to solve this problem?

On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Thanks Kurt, now it works. However I can't find a way to skip the CSV header..before there was  "format.ignore-first-line" but now I can't find another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other parsing errors, otherwise I need to manually transofrm the header into a comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <[hidden email]> wrote:
My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <[hidden email]> wrote:
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Kurt Young
Converting from table to DataStream in batch mode is indeed a problem now. But I think this will 
be improved soon. 

Best,
Kurt


On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <[hidden email]> wrote:
In my real CSV I have LONG columns that can contain null values. In that case I get a parse exception (and I would like to avoid to read it as a string).
The ',bye' is just the way you can test that in my example (add that line to the input csv).
If I use  'csv.null-literal' = '' it seems to work but, is it a workaround or it is the right solution?

Another big problem I'm having with the new APIs is that if I use 
    TableEnvironment tableEnv = TableEnvironment.create(envSettings);
then I can't convert a table to a datastream..I need to use 
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);
but in that case I can't use inBatchMode..

On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <[hidden email]> wrote:
`format.ignore-first-line` is unfortunately a regression compared to the old one. 
I've created a ticket [1] to track this but according to current design, it seems not easy to do.

Regarding null values, I'm not sure if I understand the issue you had. What do you mean by
using ',bye' to test null Long values?


Best,
Kurt


On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <[hidden email]> wrote:
And another thing: in my csv I added ',bye' (to test null Long values) but I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is that the right way to solve this problem?

On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Thanks Kurt, now it works. However I can't find a way to skip the CSV header..before there was  "format.ignore-first-line" but now I can't find another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other parsing errors, otherwise I need to manually transofrm the header into a comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <[hidden email]> wrote:
My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <[hidden email]> wrote:
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Flavio Pompermaier
That's absolutely useful. IMHO also join should work without windows/triggers and left/right outer joins should be easier in order to really migrate legacy code.
Also reduceGroup would help but less urgent.
I hope that my feedback as Flink user could be useful.

Best,
Flavio

On Fri, Apr 9, 2021 at 12:38 PM Kurt Young <[hidden email]> wrote:
Converting from table to DataStream in batch mode is indeed a problem now. But I think this will 
be improved soon. 

Best,
Kurt


On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <[hidden email]> wrote:
In my real CSV I have LONG columns that can contain null values. In that case I get a parse exception (and I would like to avoid to read it as a string).
The ',bye' is just the way you can test that in my example (add that line to the input csv).
If I use  'csv.null-literal' = '' it seems to work but, is it a workaround or it is the right solution?

Another big problem I'm having with the new APIs is that if I use 
    TableEnvironment tableEnv = TableEnvironment.create(envSettings);
then I can't convert a table to a datastream..I need to use 
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);
but in that case I can't use inBatchMode..

On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <[hidden email]> wrote:
`format.ignore-first-line` is unfortunately a regression compared to the old one. 
I've created a ticket [1] to track this but according to current design, it seems not easy to do.

Regarding null values, I'm not sure if I understand the issue you had. What do you mean by
using ',bye' to test null Long values?


Best,
Kurt


On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <[hidden email]> wrote:
And another thing: in my csv I added ',bye' (to test null Long values) but I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is that the right way to solve this problem?

On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Thanks Kurt, now it works. However I can't find a way to skip the CSV header..before there was  "format.ignore-first-line" but now I can't find another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other parsing errors, otherwise I need to manually transofrm the header into a comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <[hidden email]> wrote:
My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <[hidden email]> wrote:
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Kurt Young
Thanks for the suggestions Flavio. Join without window & left outer join already worked in Table API & SQL.
And for reduceGroup, you can try either user defined aggregate function or use table aggregate which is
available in Table API now. I'm wondering whether these can meet your requirement, or you have other 
use cases only feasible with DataStream.

Best,
Kurt


On Fri, Apr 9, 2021 at 7:41 PM Flavio Pompermaier <[hidden email]> wrote:
That's absolutely useful. IMHO also join should work without windows/triggers and left/right outer joins should be easier in order to really migrate legacy code.
Also reduceGroup would help but less urgent.
I hope that my feedback as Flink user could be useful.

Best,
Flavio

On Fri, Apr 9, 2021 at 12:38 PM Kurt Young <[hidden email]> wrote:
Converting from table to DataStream in batch mode is indeed a problem now. But I think this will 
be improved soon. 

Best,
Kurt


On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <[hidden email]> wrote:
In my real CSV I have LONG columns that can contain null values. In that case I get a parse exception (and I would like to avoid to read it as a string).
The ',bye' is just the way you can test that in my example (add that line to the input csv).
If I use  'csv.null-literal' = '' it seems to work but, is it a workaround or it is the right solution?

Another big problem I'm having with the new APIs is that if I use 
    TableEnvironment tableEnv = TableEnvironment.create(envSettings);
then I can't convert a table to a datastream..I need to use 
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);
but in that case I can't use inBatchMode..

On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <[hidden email]> wrote:
`format.ignore-first-line` is unfortunately a regression compared to the old one. 
I've created a ticket [1] to track this but according to current design, it seems not easy to do.

Regarding null values, I'm not sure if I understand the issue you had. What do you mean by
using ',bye' to test null Long values?


Best,
Kurt


On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <[hidden email]> wrote:
And another thing: in my csv I added ',bye' (to test null Long values) but I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is that the right way to solve this problem?

On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Thanks Kurt, now it works. However I can't find a way to skip the CSV header..before there was  "format.ignore-first-line" but now I can't find another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other parsing errors, otherwise I need to manually transofrm the header into a comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <[hidden email]> wrote:
My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <[hidden email]> wrote:
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Flavio Pompermaier
Thanks for the suggestions Kurt. Actually I could use Table Api I think, it's just that most of our Flink code use DataSet Api.

Il dom 11 apr 2021, 13:44 Kurt Young <[hidden email]> ha scritto:
Thanks for the suggestions Flavio. Join without window & left outer join already worked in Table API & SQL.
And for reduceGroup, you can try either user defined aggregate function or use table aggregate which is
available in Table API now. I'm wondering whether these can meet your requirement, or you have other 
use cases only feasible with DataStream.

Best,
Kurt


On Fri, Apr 9, 2021 at 7:41 PM Flavio Pompermaier <[hidden email]> wrote:
That's absolutely useful. IMHO also join should work without windows/triggers and left/right outer joins should be easier in order to really migrate legacy code.
Also reduceGroup would help but less urgent.
I hope that my feedback as Flink user could be useful.

Best,
Flavio

On Fri, Apr 9, 2021 at 12:38 PM Kurt Young <[hidden email]> wrote:
Converting from table to DataStream in batch mode is indeed a problem now. But I think this will 
be improved soon. 

Best,
Kurt


On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <[hidden email]> wrote:
In my real CSV I have LONG columns that can contain null values. In that case I get a parse exception (and I would like to avoid to read it as a string).
The ',bye' is just the way you can test that in my example (add that line to the input csv).
If I use  'csv.null-literal' = '' it seems to work but, is it a workaround or it is the right solution?

Another big problem I'm having with the new APIs is that if I use 
    TableEnvironment tableEnv = TableEnvironment.create(envSettings);
then I can't convert a table to a datastream..I need to use 
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);
but in that case I can't use inBatchMode..

On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <[hidden email]> wrote:
`format.ignore-first-line` is unfortunately a regression compared to the old one. 
I've created a ticket [1] to track this but according to current design, it seems not easy to do.

Regarding null values, I'm not sure if I understand the issue you had. What do you mean by
using ',bye' to test null Long values?


Best,
Kurt


On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <[hidden email]> wrote:
And another thing: in my csv I added ',bye' (to test null Long values) but I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is that the right way to solve this problem?

On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Thanks Kurt, now it works. However I can't find a way to skip the CSV header..before there was  "format.ignore-first-line" but now I can't find another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other parsing errors, otherwise I need to manually transofrm the header into a comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <[hidden email]> wrote:
My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <[hidden email]> wrote:
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.13 and CSV (batch) writing

Kurt Young
The Flink community has a plan to delete the DataSet API in the future, the requirements will be fulfilled by both
Table & DataStream API. It would be helpful to let us know what kind of functionality is missing in these two APIs.
If you have further information you want to share, please let us know. 

Best,
Kurt


On Sun, Apr 11, 2021 at 9:18 PM Flavio Pompermaier <[hidden email]> wrote:
Thanks for the suggestions Kurt. Actually I could use Table Api I think, it's just that most of our Flink code use DataSet Api.

Il dom 11 apr 2021, 13:44 Kurt Young <[hidden email]> ha scritto:
Thanks for the suggestions Flavio. Join without window & left outer join already worked in Table API & SQL.
And for reduceGroup, you can try either user defined aggregate function or use table aggregate which is
available in Table API now. I'm wondering whether these can meet your requirement, or you have other 
use cases only feasible with DataStream.

Best,
Kurt


On Fri, Apr 9, 2021 at 7:41 PM Flavio Pompermaier <[hidden email]> wrote:
That's absolutely useful. IMHO also join should work without windows/triggers and left/right outer joins should be easier in order to really migrate legacy code.
Also reduceGroup would help but less urgent.
I hope that my feedback as Flink user could be useful.

Best,
Flavio

On Fri, Apr 9, 2021 at 12:38 PM Kurt Young <[hidden email]> wrote:
Converting from table to DataStream in batch mode is indeed a problem now. But I think this will 
be improved soon. 

Best,
Kurt


On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <[hidden email]> wrote:
In my real CSV I have LONG columns that can contain null values. In that case I get a parse exception (and I would like to avoid to read it as a string).
The ',bye' is just the way you can test that in my example (add that line to the input csv).
If I use  'csv.null-literal' = '' it seems to work but, is it a workaround or it is the right solution?

Another big problem I'm having with the new APIs is that if I use 
    TableEnvironment tableEnv = TableEnvironment.create(envSettings);
then I can't convert a table to a datastream..I need to use 
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);
but in that case I can't use inBatchMode..

On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <[hidden email]> wrote:
`format.ignore-first-line` is unfortunately a regression compared to the old one. 
I've created a ticket [1] to track this but according to current design, it seems not easy to do.

Regarding null values, I'm not sure if I understand the issue you had. What do you mean by
using ',bye' to test null Long values?


Best,
Kurt


On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <[hidden email]> wrote:
And another thing: in my csv I added ',bye' (to test null Long values) but I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is that the right way to solve this problem?

On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Thanks Kurt, now it works. However I can't find a way to skip the CSV header..before there was  "format.ignore-first-line" but now I can't find another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other parsing errors, otherwise I need to manually transofrm the header into a comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <[hidden email]> wrote:
My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <[hidden email]> wrote:
Hi Flavio, 

We would recommend you to use new table source & sink interfaces, which have different 
property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. 

Flink SQL> set table.dml-sync=true;

[INFO] Session property has been set.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    3 |                    c |

+----------------------+----------------------+

Received a total of 1 row


Flink SQL> insert overwrite csv values(4, 'd');

[INFO] Submitting SQL update statement to the cluster...

[INFO] Execute statement in sync mode. Please wait for the execution finish...

[INFO] Complete execution of the SQL update statement.


Flink SQL> select * from csv;

+----------------------+----------------------+

|                   id |                 name |

+----------------------+----------------------+

|                    4 |                    d |

+----------------------+----------------------+

Received a total of 1 row



Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <[hidden email]> wrote:
Hi Till,
since I was using the same WITH-clause both for reading and writing I discovered that overwrite is actually supported in the Sinks, while in the Sources an exception is thrown (I was thinking that those properties were simply ignored).
However the quote-character is not supported in the sinks: is this a bug or is it the intended behaviour?. 
Here is a minimal example that reproduce the problem (put in the /tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName, "/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName, "/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Flavio,

I tried to execute the code snippet you have provided and I could not reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <[hidden email]> wrote:
Any help here? Moreover if I use the DataStream APIs there's no left/right outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm testing writing to a CSV using Flink 1.13 and I get the following error:

The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Unsupported property keys:
format.quote-character

I create the table env using this:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
        .useBlinkPlanner()//
        // .inBatchMode()//
        .inStreamingMode()//
        .build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

The error is the same both with inBatchMode and inStreamingMode.
Is this really not supported or am I using the wrong API?

Best,
Flavio