Add time attribute column to POJO stream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Add time attribute column to POJO stream

Chris Miller-2
I'm having trouble dealing with a DataStream of POJOs. In particular, when I perform SQL operations on it I can't figure out the syntax for referring to individual fields within the POJO.

Below is an example that illustrates the problem and the various approaches I've tried. Can anyone please point me in the right direction?

import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

public class PojoTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);

// Using tuples, this works as expected
List<Tuple2<String, Double>> tupleData = Arrays.asList(
new Tuple2<>("USD", 1.0),
new Tuple2<>("GBP", 1.3),
new Tuple2<>("EUR", 1.11));
DataStreamSource<Tuple2<String, Double>> tupleStream = streamEnv.fromCollection(tupleData);
tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();

// Using a DataStream of POJOs, how do I obtain an equivalent table to the above?
List<FxRate> pojoData = Arrays.asList(
new FxRate("USD", 1.0),
new FxRate("GBP", 1.3),
new FxRate("EUR", 1.11));
DataStreamSource<FxRate> pojoStream = streamEnv.fromCollection(pojoData);

Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
pojoTable.printSchema();

// This fails with "ValidationException: Cannot resolve field [currency], input field list:[fx]"
pojoTable.select("currency, rate").printSchema();

// This fails with "ValidationException: Undefined function: currency"
pojoTable.select("fx.currency AS currency, fx.rate AS rate").printSchema();

// This fails with "ValidationException: Too many fields referenced from an atomic type"
tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();

// This fails with "ValidationException: Field reference expression expected"
tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();

streamEnv.execute();
}

public static class FxRate {
public String currency;
public double rate;

public FxRate(String currency, double rate) {
this.currency = currency;
this.rate = rate;
}

@Override
public String toString() {
return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
}
}
}
Reply | Threaded
Open this post in threaded view
|

Re: Add time attribute column to POJO stream

Jingsong Lee
Hi Chris,

First thing, FxRate is not POJO, a POJO should have a constructor without arguments. In this way, you can read from a POJO DataStream directly.

Second, if you want get field from POJO, please use get function like: fx.get('currency'), if you have a POJO field, you can use this way to get nested field from POJO.

Best,
Jingsong Lee


On Wed, Dec 4, 2019 at 12:33 AM Chris Miller <[hidden email]> wrote:
I'm having trouble dealing with a DataStream of POJOs. In particular, when I perform SQL operations on it I can't figure out the syntax for referring to individual fields within the POJO.

Below is an example that illustrates the problem and the various approaches I've tried. Can anyone please point me in the right direction?

import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

public class PojoTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);

// Using tuples, this works as expected
List<Tuple2<String, Double>> tupleData = Arrays.asList(
new Tuple2<>("USD", 1.0),
new Tuple2<>("GBP", 1.3),
new Tuple2<>("EUR", 1.11));
DataStreamSource<Tuple2<String, Double>> tupleStream = streamEnv.fromCollection(tupleData);
tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();

// Using a DataStream of POJOs, how do I obtain an equivalent table to the above?
List<FxRate> pojoData = Arrays.asList(
new FxRate("USD", 1.0),
new FxRate("GBP", 1.3),
new FxRate("EUR", 1.11));
DataStreamSource<FxRate> pojoStream = streamEnv.fromCollection(pojoData);

Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
pojoTable.printSchema();

// This fails with "ValidationException: Cannot resolve field [currency], input field list:[fx]"
pojoTable.select("currency, rate").printSchema();

// This fails with "ValidationException: Undefined function: currency"
pojoTable.select("fx.currency AS currency, fx.rate AS rate").printSchema();

// This fails with "ValidationException: Too many fields referenced from an atomic type"
tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();

// This fails with "ValidationException: Field reference expression expected"
tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();

streamEnv.execute();
}

public static class FxRate {
public String currency;
public double rate;

public FxRate(String currency, double rate) {
this.currency = currency;
this.rate = rate;
}

@Override
public String toString() {
return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
}
}
}


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Accessing fields in a POJO stream

Chris Miller-2
Thank you, adding the missing constructor has done the trick! (FWIW: my 'real' code is in Kotlin and I had a data class with no @JvmOverloads or empty secondary constructor).

I haven't seen the fx.get('currency') field access syntax anywhere in the documentation, do you happen to know where I can read about that? The only thing I found was https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-field-expressions which is why I was trying the "fx.currency" style syntax instead.

Many thanks for your help,
Chris


------ Original Message ------
From: "Jingsong Lee" <[hidden email]>
To: "Chris Miller" <[hidden email]>
Cc: "user" <[hidden email]>
Sent: 04/12/2019 03:41:05
Subject: Re: Add time attribute column to POJO stream

Hi Chris,

First thing, FxRate is not POJO, a POJO should have a constructor without arguments. In this way, you can read from a POJO DataStream directly.

Second, if you want get field from POJO, please use get function like: fx.get('currency'), if you have a POJO field, you can use this way to get nested field from POJO.

Best,
Jingsong Lee


On Wed, Dec 4, 2019 at 12:33 AM Chris Miller <[hidden email]> wrote:
I'm having trouble dealing with a DataStream of POJOs. In particular, when I perform SQL operations on it I can't figure out the syntax for referring to individual fields within the POJO.

Below is an example that illustrates the problem and the various approaches I've tried. Can anyone please point me in the right direction?

import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

public class PojoTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);

// Using tuples, this works as expected
List<Tuple2<String, Double>> tupleData = Arrays.asList(
new Tuple2<>("USD", 1.0),
new Tuple2<>("GBP", 1.3),
new Tuple2<>("EUR", 1.11));
DataStreamSource<Tuple2<String, Double>> tupleStream = streamEnv.fromCollection(tupleData);
tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();

// Using a DataStream of POJOs, how do I obtain an equivalent table to the above?
List<FxRate> pojoData = Arrays.asList(
new FxRate("USD", 1.0),
new FxRate("GBP", 1.3),
new FxRate("EUR", 1.11));
DataStreamSource<FxRate> pojoStream = streamEnv.fromCollection(pojoData);

Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
pojoTable.printSchema();

// This fails with "ValidationException: Cannot resolve field [currency], input field list:[fx]"
pojoTable.select("currency, rate").printSchema();

// This fails with "ValidationException: Undefined function: currency"
pojoTable.select("fx.currency AS currency, fx.rate AS rate").printSchema();

// This fails with "ValidationException: Too many fields referenced from an atomic type"
tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();

// This fails with "ValidationException: Field reference expression expected"
tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();

streamEnv.execute();
}

public static class FxRate {
public String currency;
public double rate;

public FxRate(String currency, double rate) {
this.currency = currency;
this.rate = rate;
}

@Override
public String toString() {
return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
}
}
}


--
Best, Jingsong Lee