I'm using Flink 1.4.0
I'm trying to save the results of a Table API query to a CSV file, but I'm getting an error. Here are the details: My Input file looks like this: id,species,color,weight,name 311,canine,golden,75,dog1 312,canine,brown,22,dog2 313,feline,gray,8,cat1 I run a query on this to select canines only, and I want to save this to a csv file: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); String inputPath = "location-of-source-file"; CsvTableSource petsTableSource = CsvTableSource.builder() .path(inputPath) .ignoreFirstLine() .fieldDelimiter(",") .field("id", Types.INT()) .field("species", Types.STRING()) .field("color", Types.STRING()) .field("weight", Types.DOUBLE()) .field("name", Types.STRING()) .build(); // Register our table source tableEnv.registerTableSource("pets", petsTableSource); Table pets = tableEnv.scan("pets"); Table counts = pets .groupBy("species") .select("species, species.count as count") .filter("species === 'canine'"); DataSet<Row> result = tableEnv.toDataSet(counts, Row.class); result.print(); // Write Results to File TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets", ","); counts.writeToSink(sink); When I run this, I get the output from the result.print() call as this: canine,2 but I do not see any results written to the file, and I see the error below. How can I save the results I'm seeing in stdout to a CSV file? Thanks! 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not contain a getter for field fields 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not contain a setter for field fields 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid POJO type because not all fields are valid POJO fields. 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not contain a getter for field fields 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not contain a setter for field fields 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid POJO type because not all fields are valid POJO fields. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Do you have the complete source?
I am missing a env.execute at the end > On 27. May 2018, at 18:55, chrisr123 <[hidden email]> wrote: > > I'm using Flink 1.4.0 > > I'm trying to save the results of a Table API query to a CSV file, but I'm > getting an error. > Here are the details: > > My Input file looks like this: > id,species,color,weight,name > 311,canine,golden,75,dog1 > 312,canine,brown,22,dog2 > 313,feline,gray,8,cat1 > > I run a query on this to select canines only, and I want to save this to a > csv file: > > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > > String inputPath = "location-of-source-file"; > CsvTableSource petsTableSource = CsvTableSource.builder() > .path(inputPath) > .ignoreFirstLine() > .fieldDelimiter(",") > .field("id", Types.INT()) > .field("species", Types.STRING()) > .field("color", Types.STRING()) > .field("weight", Types.DOUBLE()) > .field("name", Types.STRING()) > .build(); > > // Register our table source > tableEnv.registerTableSource("pets", petsTableSource); > Table pets = tableEnv.scan("pets"); > > Table counts = pets > .groupBy("species") > .select("species, species.count as count") > .filter("species === 'canine'"); > > DataSet<Row> result = tableEnv.toDataSet(counts, Row.class); > result.print(); > > // Write Results to File > TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets", ","); > counts.writeToSink(sink); > > When I run this, I get the output from the result.print() call as this: > > canine,2 > > but I do not see any results written > to the file, and I see the error below. > How can I save the results I'm seeing in stdout to a CSV file? > Thanks! > > > > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor > (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not > contain a getter for field fields > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor > (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not > contain a setter for field fields > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor > (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid > POJO type because not all fields are valid POJO fields. > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor > (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not > contain a getter for field fields > 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor > (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not > contain a setter for field fields > 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor > (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid > POJO type because not all fields are valid POJO fields. > > > > > > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Jörn is probably right.In contrast to The Best, Fabian 2018-05-27 19:51 GMT+02:00 Jörn Franke <[hidden email]>: Do you have the complete source? |
Fabian, Jorn: Yes, that was indeed it. When I added the env.execute("MyApp") it worked. Thank you for your help. -Chris On Mon, May 28, 2018 at 5:03 AM, Fabian Hueske <[hidden email]> wrote:
----------------------------------------------------------------------------------------------------------------------------------------
Simplicity is the ultimate sophistication --Leonardo DaVinci |
Free forum by Nabble | Edit this page |