Writing Table API results to a csv file

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing Table API results to a csv file

chrisr123
I'm using Flink 1.4.0

I'm trying to save the results of a Table API query to a CSV file, but I'm
getting an error.
Here are the details:

My Input file looks like this:
id,species,color,weight,name
311,canine,golden,75,dog1
312,canine,brown,22,dog2
313,feline,gray,8,cat1

I run a query on this to select canines only, and I want to save this to a
csv file:

                        ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
                        BatchTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

                        String inputPath = "location-of-source-file";
                        CsvTableSource petsTableSource = CsvTableSource.builder()
                                    .path(inputPath)
                                    .ignoreFirstLine()
                                    .fieldDelimiter(",")
                                    .field("id", Types.INT())
                                    .field("species", Types.STRING())
                                    .field("color", Types.STRING())
                                    .field("weight", Types.DOUBLE())
                                    .field("name", Types.STRING())
                                    .build();

                        // Register our table source
                        tableEnv.registerTableSource("pets", petsTableSource);
                        Table pets = tableEnv.scan("pets");

                        Table counts = pets
                                .groupBy("species")
                                .select("species, species.count as count")
                                .filter("species === 'canine'");

                        DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
                        result.print();

                        // Write Results to File
                        TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets", ",");
                        counts.writeToSink(sink);

When I run this, I get the output from the result.print() call as this:

canine,2

but I do not see any results written
to the file, and I see the error below.
How can I save the results I'm seeing in stdout to a CSV file?
Thanks!



2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
contain a getter for field fields
2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
contain a setter for field fields
2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
POJO type because not all fields are valid POJO fields.
2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
contain a getter for field fields
2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
contain a setter for field fields
2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
POJO type because not all fields are valid POJO fields.









--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Writing Table API results to a csv file

Jörn Franke
Do you have the complete source?

I am missing a env.execute at the end

> On 27. May 2018, at 18:55, chrisr123 <[hidden email]> wrote:
>
> I'm using Flink 1.4.0
>
> I'm trying to save the results of a Table API query to a CSV file, but I'm
> getting an error.
> Here are the details:
>
> My Input file looks like this:
> id,species,color,weight,name
> 311,canine,golden,75,dog1
> 312,canine,brown,22,dog2
> 313,feline,gray,8,cat1
>
> I run a query on this to select canines only, and I want to save this to a
> csv file:
>
>            ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>            BatchTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>                        String inputPath = "location-of-source-file";
>            CsvTableSource petsTableSource = CsvTableSource.builder()
>                    .path(inputPath)
>                    .ignoreFirstLine()
>                    .fieldDelimiter(",")
>                    .field("id", Types.INT())
>                    .field("species", Types.STRING())
>                    .field("color", Types.STRING())
>                    .field("weight", Types.DOUBLE())
>                    .field("name", Types.STRING())
>                    .build();
>
>            // Register our table source
>            tableEnv.registerTableSource("pets", petsTableSource);
>            Table pets = tableEnv.scan("pets");
>
>            Table counts = pets
>                    .groupBy("species")
>                    .select("species, species.count as count")
>                    .filter("species === 'canine'");
>
>            DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
>                        result.print();
>
>            // Write Results to File
>            TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets", ",");
>            counts.writeToSink(sink);
>
> When I run this, I get the output from the result.print() call as this:
>
> canine,2
>
> but I do not see any results written
> to the file, and I see the error below.
> How can I save the results I'm seeing in stdout to a CSV file?
> Thanks!
>
>
>
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> contain a getter for field fields
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> contain a setter for field fields
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
> POJO type because not all fields are valid POJO fields.
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> contain a getter for field fields
> 2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> contain a setter for field fields
> 2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
> POJO type because not all fields are valid POJO fields.
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Writing Table API results to a csv file

Fabian Hueske-2
Hi,

Jörn is probably right.

In contrast to print(), which immediately triggers an execution, writeToSink() just appends a sink operator and requires to explicitly trigger the execution.

The INFO messages of the TypeExtractor are "just" telling you, that Row cannot be used as a POJO type, but that's fine here.

Best, Fabian


2018-05-27 19:51 GMT+02:00 Jörn Franke <[hidden email]>:
Do you have the complete source?

I am missing a env.execute at the end

> On 27. May 2018, at 18:55, chrisr123 <[hidden email]> wrote:
>
> I'm using Flink 1.4.0
>
> I'm trying to save the results of a Table API query to a CSV file, but I'm
> getting an error.
> Here are the details:
>
> My Input file looks like this:
> id,species,color,weight,name
> 311,canine,golden,75,dog1
> 312,canine,brown,22,dog2
> 313,feline,gray,8,cat1
>
> I run a query on this to select canines only, and I want to save this to a
> csv file:
>
>            ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>            BatchTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>                        String inputPath = "location-of-source-file";
>            CsvTableSource petsTableSource = CsvTableSource.builder()
>                    .path(inputPath)
>                    .ignoreFirstLine()
>                    .fieldDelimiter(",")
>                    .field("id", Types.INT())
>                    .field("species", Types.STRING())
>                    .field("color", Types.STRING())
>                    .field("weight", Types.DOUBLE())
>                    .field("name", Types.STRING())
>                    .build();
>
>            // Register our table source
>            tableEnv.registerTableSource("pets", petsTableSource);
>            Table pets = tableEnv.scan("pets");
>
>            Table counts = pets
>                    .groupBy("species")
>                    .select("species, species.count as count")
>                    .filter("species === 'canine'");
>
>            DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
>                        result.print();
>
>            // Write Results to File
>            TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets", ",");
>            counts.writeToSink(sink);
>
> When I run this, I get the output from the result.print() call as this:
>
> canine,2
>
> but I do not see any results written
> to the file, and I see the error below.
> How can I save the results I'm seeing in stdout to a CSV file?
> Thanks!
>
>
>
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> contain a getter for field fields
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> contain a setter for field fields
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
> POJO type because not all fields are valid POJO fields.
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> contain a getter for field fields
> 2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> contain a setter for field fields
> 2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
> POJO type because not all fields are valid POJO fields.
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Writing Table API results to a csv file

chrisr123
Fabian, Jorn:

Yes, that was indeed it. 
When I added the env.execute("MyApp") it worked. 
Thank you for your help.
-Chris




On Mon, May 28, 2018 at 5:03 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

Jörn is probably right.

In contrast to print(), which immediately triggers an execution, writeToSink() just appends a sink operator and requires to explicitly trigger the execution.

The INFO messages of the TypeExtractor are "just" telling you, that Row cannot be used as a POJO type, but that's fine here.

Best, Fabian


2018-05-27 19:51 GMT+02:00 Jörn Franke <[hidden email]>:
Do you have the complete source?

I am missing a env.execute at the end

> On 27. May 2018, at 18:55, chrisr123 <[hidden email]> wrote:
>
> I'm using Flink 1.4.0
>
> I'm trying to save the results of a Table API query to a CSV file, but I'm
> getting an error.
> Here are the details:
>
> My Input file looks like this:
> id,species,color,weight,name
> 311,canine,golden,75,dog1
> 312,canine,brown,22,dog2
> 313,feline,gray,8,cat1
>
> I run a query on this to select canines only, and I want to save this to a
> csv file:
>
>            ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>            BatchTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>                        String inputPath = "location-of-source-file";
>            CsvTableSource petsTableSource = CsvTableSource.builder()
>                    .path(inputPath)
>                    .ignoreFirstLine()
>                    .fieldDelimiter(",")
>                    .field("id", Types.INT())
>                    .field("species", Types.STRING())
>                    .field("color", Types.STRING())
>                    .field("weight", Types.DOUBLE())
>                    .field("name", Types.STRING())
>                    .build();
>
>            // Register our table source
>            tableEnv.registerTableSource("pets", petsTableSource);
>            Table pets = tableEnv.scan("pets");
>
>            Table counts = pets
>                    .groupBy("species")
>                    .select("species, species.count as count")
>                    .filter("species === 'canine'");
>
>            DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
>                        result.print();
>
>            // Write Results to File
>            TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets", ",");
>            counts.writeToSink(sink);
>
> When I run this, I get the output from the result.print() call as this:
>
> canine,2
>
> but I do not see any results written
> to the file, and I see the error below.
> How can I save the results I'm seeing in stdout to a CSV file?
> Thanks!
>
>
>
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> contain a getter for field fields
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> contain a setter for field fields
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
> POJO type because not all fields are valid POJO fields.
> 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> contain a getter for field fields
> 2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> contain a setter for field fields
> 2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
> (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
> POJO type because not all fields are valid POJO fields.
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




--
----------------------------------------------------------------------------------------------------------------------------------------
Simplicity is the ultimate sophistication
--Leonardo DaVinci