Simple stdout sink for testing Table API?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Simple stdout sink for testing Table API?

chrisr123
Is there a simple way to output the first few rows of a Flink table to stdout
when developing an application?  I just want to see the first 10-20 rows on
screen
during development to make sure my logic is correct.
There doesnt seem to be something like print(10) in the API to see the first
n rows
Here is simple sample program, but I am writing to a CSV table sink for
testing right now.

// Get Customers
String customersPath = "input/customers.csv";
// id,first_name,last_name,email,address,city,state,zip
CsvTableSource customersTableSource = CsvTableSource.builder()
            .path(customersPath)
            .ignoreFirstLine()
            .fieldDelimiter(",")
            .field("id", Types.INT())
            .field("first_name", Types.STRING())
            .field("last_name", Types.STRING())
            .field("email", Types.STRING())
            .field("address", Types.STRING())
            .field("city", Types.STRING())
            .field("state", Types.STRING())
            .field("zip", Types.STRING())
            .build();


// Register our table source
tableEnv.registerTableSource("customers", customersTableSource);
Table customers = tableEnv.scan("customers");


// Perform Operations
// SELECT id,last_name
// FROM customers
Table projection1 = customers
        .select("id,last_name")
        .filter("last_name !== 'foobar'");


// Write to Sinks
int parallelism = 1;
TableSink<Row> sink = new CsvTableSink("output/customers_out.csv", ",",
parallelism, WriteMode.OVERWRITE);
projection1.writeToSink(sink);



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Simple stdout sink for testing Table API?

Hequn Cheng
Hi chrisr,

It seems there are no "single line" ways to solve your problem. To print results on screen, you can use the DataStream.print() / DataSet.print() method, and to limit the output you can add a FilterFunction. The code looks like: 

Table projection1 = customers
        .select("id,last_name")
        .filter("last_name !== 'foobar'");
 

tableEnv.toAppendStream(projection1, Row.class).filter(new FilterFunction<Row>() {
  int count = 0;
  @Override
  public boolean filter(Row value) throws Exception {
    return count ++ < 10;
  }
}).setParallelism(1).print();

Hope this helps,
Hequn

On Sun, Jun 24, 2018 at 7:05 AM, chrisr123 <[hidden email]> wrote:
Is there a simple way to output the first few rows of a Flink table to stdout
when developing an application?  I just want to see the first 10-20 rows on
screen
during development to make sure my logic is correct.
There doesnt seem to be something like print(10) in the API to see the first
n rows
Here is simple sample program, but I am writing to a CSV table sink for
testing right now.

// Get Customers
String customersPath = "input/customers.csv";
// id,first_name,last_name,email,address,city,state,zip
CsvTableSource customersTableSource = CsvTableSource.builder()
            .path(customersPath)
            .ignoreFirstLine()
            .fieldDelimiter(",")
            .field("id", Types.INT())
            .field("first_name", Types.STRING())
            .field("last_name", Types.STRING())
            .field("email", Types.STRING())
            .field("address", Types.STRING())
            .field("city", Types.STRING())
            .field("state", Types.STRING())
            .field("zip", Types.STRING())
            .build();


// Register our table source
tableEnv.registerTableSource("customers", customersTableSource);
Table customers = tableEnv.scan("customers");


// Perform Operations
// SELECT id,last_name
// FROM customers
Table projection1 = customers
        .select("id,last_name")
        .filter("last_name !== 'foobar'");


// Write to Sinks
int parallelism = 1;
TableSink<Row> sink = new CsvTableSink("output/customers_out.csv", ",",
parallelism, WriteMode.OVERWRITE);
projection1.writeToSink(sink);



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Simple stdout sink for testing Table API?

chrisr123