[flink sql] table in subquery join temporal table raise java.lang.NullPointerException

classic Classic list List threaded Threaded
2 messages Options
hzp
Reply | Threaded
Open this post in threaded view
|

[flink sql] table in subquery join temporal table raise java.lang.NullPointerException

hzp
Hi all,

I'm using flink sql to join a temporal table in a subquery, but it raises java.lang.NullPointerException when execute.

Orders is a table source, and Rates is a temporal table

Here are my sqls:
// works
SELECT o_amount * r_amount AS amount
FROM Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency

// sql raise exception
SELECT o_amount * r_amount AS amount
FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency

The error stack:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276)
at cn.easyops.flink_sql.Test.main(Test.java:159)


Here is the complete test code, hope anyone can help, thanks!

package test.flinksql;

import java.util.Random;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

public class TemporalTableFunctionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        EnvironmentSettings bsSettings =
            EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
            .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, bsSettings);
       
        tEnv.registerTableSource("RatesHistory", new FooSource(new String[] {"r_currency", "r_amount", "r_proctime"}));
        Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory");
        TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
        tEnv.registerFunction("Rates", rates);
       
        tEnv.registerTableSource("Orders", new FooSource(new String[] {"o_currency", "o_amount", "o_proctime"}));
       
        tEnv.registerTableSink("OutSink", new SysoSink());

        // works
        Table prices = tEnv.sqlQuery(
                "  SELECT                                     \r\n" +
                "    o_amount * r_amount AS amount            \r\n" +
                "  FROM Orders                                \r\n" +
                "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
                "  WHERE r_currency = o_currency              ");

        // Raise NullPointerException
        //Table prices = tEnv.sqlQuery(
        //        "  SELECT                                     \r\n" +
        //        "    o_amount * r_amount AS amount            \r\n" +
        //        "  FROM (SELECT * FROM Orders) as O                                \r\n" +
        //        "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
        //        "  WHERE r_currency = o_currency              ");
       
        prices.insertInto("OutSink");
       
        sEnv.execute();
    }
   

    public static class SysoSink implements RetractStreamTableSink<Row> {
        @Override
        public String[] getFieldNames() {
            return new String[] {"out"};
        }
        @Override
        public TypeInformation<?>[] getFieldTypes() {
            return new TypeInformation[] {Types.LONG()};
        }
        @Override
        public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
            return this;
        }
        @Override
        public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            consumeDataStream(dataStream);
        }
        @Override
        public TypeInformation<Row> getRecordType() {
            return Types.ROW(getFieldNames(), getFieldTypes());
        }
        @Override
        public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean, Row>>());
        }
    }
   
    @SuppressWarnings("serial")
    public static class SysoSinkFunction<T> implements SinkFunction<T> {
        @Override
        public void invoke(T value) throws Exception {
            System.out.println(value);
        }
    }
   
    public static class FooSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
       
        String[] fieldNames;
       
       
        public FooSource(String[] fieldNames) {
            this.fieldNames = fieldNames;
        }

        @Override
        public TableSchema getTableSchema() {
            return new TableSchema(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }
       
        @Override
        public TypeInformation<Row> getReturnType() {
            return Types.ROW(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }

        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
            return execEnv.addSource(new SourceFunction<Row>() {

                @Override
                public void run(SourceContext<Row> ctx) throws Exception {
                    Random random = new Random();
                   
                    while (true) {

                        Row row = new Row(3);
                        row.setField(0, "Euro" + random.nextLong() % 3);
                        row.setField(1, random.nextLong() % 200 );
                        row.setField(2, new java.sql.Timestamp(System.currentTimeMillis()));
                        ctx.collect(row);
                        Thread.sleep(100);
                    }
                   
                }

                @Override
                public void cancel() {
                    System.out.println("cancelling ----------------------------------------------");
                   
                }
            }, getReturnType());
        }

        @Override
        public String getProctimeAttribute() {
            return fieldNames[2];
        }
    }

}

Reply | Threaded
Open this post in threaded view
|

Re: [flink sql] table in subquery join temporal table raise java.lang.NullPointerException

Till Rohrmann
Thanks for reporting this issue. I've pulled in Jark and Kurt who might help you with this problem.

Cheers,
Till

On Sat, Oct 12, 2019 at 5:42 PM hzp <[hidden email]> wrote:
Hi all,

I'm using flink sql to join a temporal table in a subquery, but it raises java.lang.NullPointerException when execute.

Orders is a table source, and Rates is a temporal table

Here are my sqls:
// works
SELECT o_amount * r_amount AS amount
FROM Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency

// sql raise exception
SELECT o_amount * r_amount AS amount
FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency

The error stack:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276)
at cn.easyops.flink_sql.Test.main(Test.java:159)


Here is the complete test code, hope anyone can help, thanks!

package test.flinksql;

import java.util.Random;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

public class TemporalTableFunctionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        EnvironmentSettings bsSettings =
            EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
            .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, bsSettings);
       
        tEnv.registerTableSource("RatesHistory", new FooSource(new String[] {"r_currency", "r_amount", "r_proctime"}));
        Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory");
        TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
        tEnv.registerFunction("Rates", rates);
       
        tEnv.registerTableSource("Orders", new FooSource(new String[] {"o_currency", "o_amount", "o_proctime"}));
       
        tEnv.registerTableSink("OutSink", new SysoSink());

        // works
        Table prices = tEnv.sqlQuery(
                "  SELECT                                     \r\n" +
                "    o_amount * r_amount AS amount            \r\n" +
                "  FROM Orders                                \r\n" +
                "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
                "  WHERE r_currency = o_currency              ");

        // Raise NullPointerException
        //Table prices = tEnv.sqlQuery(
        //        "  SELECT                                     \r\n" +
        //        "    o_amount * r_amount AS amount            \r\n" +
        //        "  FROM (SELECT * FROM Orders) as O                                \r\n" +
        //        "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
        //        "  WHERE r_currency = o_currency              ");
       
        prices.insertInto("OutSink");
       
        sEnv.execute();
    }
   

    public static class SysoSink implements RetractStreamTableSink<Row> {
        @Override
        public String[] getFieldNames() {
            return new String[] {"out"};
        }
        @Override
        public TypeInformation<?>[] getFieldTypes() {
            return new TypeInformation[] {Types.LONG()};
        }
        @Override
        public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
            return this;
        }
        @Override
        public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            consumeDataStream(dataStream);
        }
        @Override
        public TypeInformation<Row> getRecordType() {
            return Types.ROW(getFieldNames(), getFieldTypes());
        }
        @Override
        public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean, Row>>());
        }
    }
   
    @SuppressWarnings("serial")
    public static class SysoSinkFunction<T> implements SinkFunction<T> {
        @Override
        public void invoke(T value) throws Exception {
            System.out.println(value);
        }
    }
   
    public static class FooSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
       
        String[] fieldNames;
       
       
        public FooSource(String[] fieldNames) {
            this.fieldNames = fieldNames;
        }

        @Override
        public TableSchema getTableSchema() {
            return new TableSchema(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }
       
        @Override
        public TypeInformation<Row> getReturnType() {
            return Types.ROW(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }

        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
            return execEnv.addSource(new SourceFunction<Row>() {

                @Override
                public void run(SourceContext<Row> ctx) throws Exception {
                    Random random = new Random();
                   
                    while (true) {

                        Row row = new Row(3);
                        row.setField(0, "Euro" + random.nextLong() % 3);
                        row.setField(1, random.nextLong() % 200 );
                        row.setField(2, new java.sql.Timestamp(System.currentTimeMillis()));
                        ctx.collect(row);
                        Thread.sleep(100);
                    }
                   
                }

                @Override
                public void cancel() {
                    System.out.println("cancelling ----------------------------------------------");
                   
                }
            }, getReturnType());
        }

        @Override
        public String getProctimeAttribute() {
            return fieldNames[2];
        }
    }

}