Flink Table Duplicate Evaluation

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Table Duplicate Evaluation

Niklas Teichmann
Hi everybody,

I have a question concerning the Flink Table API, more precisely the  
way the results of tables statements are evaluated. In the following  
code example, the statement defining the table t1 is evaluated twice,  
an effect that leads to some issues of performance and logic in the  
program I am trying to write.

List<Long> longList = Arrays.asList(1L, 2L, 3L, 4L, 5L);
DataSet<Long> longDataSet =  
getExecutionEnvironment().fromCollection(longList);

tenv.registerDataSet("longs", longDataSet, "l");
tenv.registerFunction("time", new Time()); //an example UDF that  
evaluates the current time

Table t1 = tenv.scan("longs");
t1 = t1.select("l, time() as t");

Table t2 = t1.as("l1, id1");
Table t3 = t1.as("l2, id2");

Table t4 = t2.join(t3).where("l1 == l2");

t4.writeToSink(new PrintTableSink() ); //a sink that prints the  
content of the table

I realize that this behaviour is defined in the documentation ("A  
registered Table is treated similarly to a VIEW ...") and probably  
stems from the DataStream API. But is there a preferred way to avoid  
this?

Currently I'm using a workaround that defines a TableSink which in  
turn registers its output as a new table. That seems extremely hacky  
though.

Sorry if I missed something obvious!

All the best,
Niklas


--



Reply | Threaded
Open this post in threaded view
|

Re: Flink Table Duplicate Evaluation

Fabian Hueske-2
Hi Niklas,

The workaround that you described should work fine.
However, you don't need a custom sink.
Converting the Table into a DataSet and registering the DataSet again as a Table is currently the way to solve this issue.

Best, Fabian

Am Di., 20. Nov. 2018 um 17:13 Uhr schrieb Niklas Teichmann <[hidden email]>:
Hi everybody,

I have a question concerning the Flink Table API, more precisely the 
way the results of tables statements are evaluated. In the following 
code example, the statement defining the table t1 is evaluated twice, 
an effect that leads to some issues of performance and logic in the 
program I am trying to write.

List<Long> longList = Arrays.asList(1L, 2L, 3L, 4L, 5L);
DataSet<Long> longDataSet = 
getExecutionEnvironment().fromCollection(longList);

tenv.registerDataSet("longs", longDataSet, "l");
tenv.registerFunction("time", new Time()); //an example UDF that 
evaluates the current time

Table t1 = tenv.scan("longs");
t1 = t1.select("l, time() as t");

Table t2 = t1.as("l1, id1");
Table t3 = t1.as("l2, id2");

Table t4 = t2.join(t3).where("l1 == l2");

t4.writeToSink(new PrintTableSink() ); //a sink that prints the 
content of the table

I realize that this behaviour is defined in the documentation ("A 
registered Table is treated similarly to a VIEW ...") and probably 
stems from the DataStream API. But is there a preferred way to avoid 
this?

Currently I'm using a workaround that defines a TableSink which in 
turn registers its output as a new table. That seems extremely hacky 
though.

Sorry if I missed something obvious!

All the best,
Niklas


--