Scala 2.12.12
While attempting to fix a serialization bug I previously wrote about, I temporarily disabled projection pushdown for my custom source implementation. I then proceeded to run the application only to encounter a ClassCastException, which after debugging was caused by the fact the WatermarkGenerator auto-generated code was trying to access the rowtime under a wrong index (0 instead of 1).
My program is as follows:
TableSource -> SQL query over table -> Sink
Now, since I did not use projection pushdown (which would have probably fixed the issue and caused the re-ordering of the field indexes), the wrong idx was accessed.
You can see the reordering in the plan:
Initially:
+- LogicalProject(event_time=[$1])
+- LogicalWatermarkAssigner(rowtime=[event_time], watermark=[$1])
+- LogicalTableScan(table=[[foo-catalog, default-db, foo-table]])
After optimization:
+- FlinkLogicalWatermarkAssigner(rowtime=[event_time], watermark=[$0])
+- FlinkLogicalTableSourceScan(table=[[foo-catalog, default-db, foo-table, project=[event_time])
Is this the desired behavior? I was quite surprised by this and the fact that the pruning happens regardless of if the underlying table is a TableSource or if it actually filters out these unused fields from the result query.
--
Best Regards,
Yuval Itzchakov.