TableSource being duplicated

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

TableSource being duplicated

Benoît Paris-2
Hello all!

I'm having a problem with TableSources' DataStream being duplicated when pulled on from 2 sinks.

I understand that sometimes the best plan might just be to duplicate and read both times a TableSource/SourceFunction; but in my case I can't quite reproduce the data as say Kafka would. I just need the SourceFunction and DataStream provided by the TableSource to not be duplicated.

As a workaround to this issue, I introduce some sort of materialization barrier that makes the planner pull only on one instance of the TableSource/SourceFunction:
Instead of:
tEnv.registerTableSource("foo_table", new FooTableSource());
I convert it to an Append Stream, and back again to a Table:
tEnv.registerTableSource("foo_table_source", new FooTableSource());
Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
Table appendingSourceTable = tEnv.fromDataStream(
tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new TypeInformation[]{Types.LONG()}))
);
tEnv.registerTable("foo_table", appendingSourceTable);
And the conversion to an Append Stream somewhat makes the planner behave and there is only one DataSource in the execution plan.

But I'm feeling like I'm just missing a simple option (on the SourceFunction, or on the TableSource?) to invoke and declare the Source as being non duplicateable.

I have tried a lot of options (uid(), operation chaining restrictions, twiddling the transformation, forceNonParallel(), etc.), but can't find quite how to do that! My SourceFunction is a RichSourceFunction

At this point I'm wondering if this is a bug, or if it is a feature that would have to be implemented.

Cheers,
Ben





Reply | Threaded
Open this post in threaded view
|

Re: TableSource being duplicated

Benchao Li
Hi Benoît,

Do you mean if you register one TableSource, and add two sinks from the same TableSource, the source will duplicate ?
If so, maybe you can check TableEnvironmentImpl.isEagerOperationTranslation, it's false by default. But in StreamTableEnvironmentImpl, it's true because we need eager translation to keep alignment with DataStream Api.
If you don't need Table <-> DataStream translation, you can just use TableEnvironmentImpl instead of StreamTableEnvironmentImpl to achieve your goal.

Hope it helps.

Benoît Paris <[hidden email]> 于2020年1月23日周四 上午6:50写道:
Hello all!

I'm having a problem with TableSources' DataStream being duplicated when pulled on from 2 sinks.

I understand that sometimes the best plan might just be to duplicate and read both times a TableSource/SourceFunction; but in my case I can't quite reproduce the data as say Kafka would. I just need the SourceFunction and DataStream provided by the TableSource to not be duplicated.

As a workaround to this issue, I introduce some sort of materialization barrier that makes the planner pull only on one instance of the TableSource/SourceFunction:
Instead of:
tEnv.registerTableSource("foo_table", new FooTableSource());
I convert it to an Append Stream, and back again to a Table:
tEnv.registerTableSource("foo_table_source", new FooTableSource());
Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
Table appendingSourceTable = tEnv.fromDataStream(
tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new TypeInformation[]{Types.LONG()}))
);
tEnv.registerTable("foo_table", appendingSourceTable);
And the conversion to an Append Stream somewhat makes the planner behave and there is only one DataSource in the execution plan.

But I'm feeling like I'm just missing a simple option (on the SourceFunction, or on the TableSource?) to invoke and declare the Source as being non duplicateable.

I have tried a lot of options (uid(), operation chaining restrictions, twiddling the transformation, forceNonParallel(), etc.), but can't find quite how to do that! My SourceFunction is a RichSourceFunction

At this point I'm wondering if this is a bug, or if it is a feature that would have to be implemented.

Cheers,
Ben







--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]