【flink sql】table in subquery join temporal table raise java.lang.NullPointerException

classic Classic list List threaded Threaded
3 messages Options
hzp
Reply | Threaded
Open this post in threaded view
|

【flink sql】table in subquery join temporal table raise java.lang.NullPointerException

hzp
Hi all,

I'm using flink sql to join a temporal table in a subquery, but it raises java.lang.NullPointerException when execute.

Orders is a table source, and Rates is a temporal table

Here are my sqls:
// works
SELECT o_amount * r_amount AS amount
FROM Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency

// sql raise exception
SELECT o_amount * r_amount AS amount
FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency

The error stack: 
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276)
at cn.easyops.flink_sql.Test.main(Test.java:159)


Here is the complete test code, hope anyone can help, thanks! 

package test.flinksql;

import java.util.Random;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

public class TemporalTableFunctionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        EnvironmentSettings bsSettings = 
            EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
            .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, bsSettings);
        
        tEnv.registerTableSource("RatesHistory", new FooSource(new String[] {"r_currency", "r_amount", "r_proctime"}));
        Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory");
        TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
        tEnv.registerFunction("Rates", rates);
        
        tEnv.registerTableSource("Orders", new FooSource(new String[] {"o_currency", "o_amount", "o_proctime"}));
        
        tEnv.registerTableSink("OutSink", new SysoSink());

        // works
        Table prices = tEnv.sqlQuery(
                "  SELECT                                     \r\n" +
                "    o_amount * r_amount AS amount            \r\n" +
                "  FROM Orders                                \r\n" +
                "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
                "  WHERE r_currency = o_currency              ");

        // Raise NullPointerException
        //Table prices = tEnv.sqlQuery(
        //        "  SELECT                                     \r\n" +
        //        "    o_amount * r_amount AS amount            \r\n" +
        //        "  FROM (SELECT * FROM Orders) as O                                \r\n" +
        //        "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
        //        "  WHERE r_currency = o_currency              ");
        
        prices.insertInto("OutSink");
        
        sEnv.execute();
    }
    

    public static class SysoSink implements RetractStreamTableSink<Row> {
        @Override
        public String[] getFieldNames() {
            return new String[] {"out"};
        }
        @Override
        public TypeInformation<?>[] getFieldTypes() {
            return new TypeInformation[] {Types.LONG()};
        }
        @Override
        public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
            return this;
        }
        @Override
        public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            consumeDataStream(dataStream);
        }
        @Override
        public TypeInformation<Row> getRecordType() {
            return Types.ROW(getFieldNames(), getFieldTypes());
        }
        @Override
        public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean, Row>>());
        }
    }
    
    @SuppressWarnings("serial")
    public static class SysoSinkFunction<T> implements SinkFunction<T> {
        @Override
        public void invoke(T value) throws Exception {
            System.out.println(value);
        }
    }
    
    public static class FooSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
        
        String[] fieldNames;
        
        
        public FooSource(String[] fieldNames) {
            this.fieldNames = fieldNames;
        }

        @Override
        public TableSchema getTableSchema() {
            return new TableSchema(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }
        
        @Override
        public TypeInformation<Row> getReturnType() {
            return Types.ROW(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }

        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
            return execEnv.addSource(new SourceFunction<Row>() {

                @Override
                public void run(SourceContext<Row> ctx) throws Exception {
                    Random random = new Random();
                    
                    while (true) {

                        Row row = new Row(3);
                        row.setField(0, "Euro" + random.nextLong() % 3);
                        row.setField(1, random.nextLong() % 200 );
                        row.setField(2, new java.sql.Timestamp(System.currentTimeMillis()));
                        ctx.collect(row);
                        Thread.sleep(100);
                    }
                    
                }

                @Override
                public void cancel() {
                    System.out.println("cancelling ----------------------------------------------");
                    
                }
            }, getReturnType());
        }

        @Override
        public String getProctimeAttribute() {
            return fieldNames[2];
        }
    }

}

Reply | Threaded
Open this post in threaded view
|

Re: 【flink sql】table in subquery join temporal table raise java.lang.NullPointerException

Benoît Paris-2
This seems to be related to:

https://issues.apache.org/jira/browse/FLINK-14200  (Temporal Table Function Joins do not work on Tables (only TableSources) on the query side)



On Sat, Oct 12, 2019 at 5:56 PM hzp <[hidden email]> wrote:
Hi all,

I'm using flink sql to join a temporal table in a subquery, but it raises java.lang.NullPointerException when execute.

Orders is a table source, and Rates is a temporal table

Here are my sqls:
// works
SELECT o_amount * r_amount AS amount
FROM Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency

// sql raise exception
SELECT o_amount * r_amount AS amount
FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency

The error stack: 
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276)
at cn.easyops.flink_sql.Test.main(Test.java:159)


Here is the complete test code, hope anyone can help, thanks! 

package test.flinksql;

import java.util.Random;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

public class TemporalTableFunctionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        EnvironmentSettings bsSettings = 
            EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
            .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, bsSettings);
        
        tEnv.registerTableSource("RatesHistory", new FooSource(new String[] {"r_currency", "r_amount", "r_proctime"}));
        Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory");
        TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
        tEnv.registerFunction("Rates", rates);
        
        tEnv.registerTableSource("Orders", new FooSource(new String[] {"o_currency", "o_amount", "o_proctime"}));
        
        tEnv.registerTableSink("OutSink", new SysoSink());

        // works
        Table prices = tEnv.sqlQuery(
                "  SELECT                                     \r\n" +
                "    o_amount * r_amount AS amount            \r\n" +
                "  FROM Orders                                \r\n" +
                "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
                "  WHERE r_currency = o_currency              ");

        // Raise NullPointerException
        //Table prices = tEnv.sqlQuery(
        //        "  SELECT                                     \r\n" +
        //        "    o_amount * r_amount AS amount            \r\n" +
        //        "  FROM (SELECT * FROM Orders) as O                                \r\n" +
        //        "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
        //        "  WHERE r_currency = o_currency              ");
        
        prices.insertInto("OutSink");
        
        sEnv.execute();
    }
    

    public static class SysoSink implements RetractStreamTableSink<Row> {
        @Override
        public String[] getFieldNames() {
            return new String[] {"out"};
        }
        @Override
        public TypeInformation<?>[] getFieldTypes() {
            return new TypeInformation[] {Types.LONG()};
        }
        @Override
        public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
            return this;
        }
        @Override
        public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            consumeDataStream(dataStream);
        }
        @Override
        public TypeInformation<Row> getRecordType() {
            return Types.ROW(getFieldNames(), getFieldTypes());
        }
        @Override
        public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean, Row>>());
        }
    }
    
    @SuppressWarnings("serial")
    public static class SysoSinkFunction<T> implements SinkFunction<T> {
        @Override
        public void invoke(T value) throws Exception {
            System.out.println(value);
        }
    }
    
    public static class FooSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
        
        String[] fieldNames;
        
        
        public FooSource(String[] fieldNames) {
            this.fieldNames = fieldNames;
        }

        @Override
        public TableSchema getTableSchema() {
            return new TableSchema(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }
        
        @Override
        public TypeInformation<Row> getReturnType() {
            return Types.ROW(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }

        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
            return execEnv.addSource(new SourceFunction<Row>() {

                @Override
                public void run(SourceContext<Row> ctx) throws Exception {
                    Random random = new Random();
                    
                    while (true) {

                        Row row = new Row(3);
                        row.setField(0, "Euro" + random.nextLong() % 3);
                        row.setField(1, random.nextLong() % 200 );
                        row.setField(2, new java.sql.Timestamp(System.currentTimeMillis()));
                        ctx.collect(row);
                        Thread.sleep(100);
                    }
                    
                }

                @Override
                public void cancel() {
                    System.out.println("cancelling ----------------------------------------------");
                    
                }
            }, getReturnType());
        }

        @Override
        public String getProctimeAttribute() {
            return fieldNames[2];
        }
    }

}



--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml
Reply | Threaded
Open this post in threaded view
|

Re: 【flink sql】table in subquery join temporal table raise java.lang.NullPointerException

黄兆鹏
Hi, 
Thanks for reply. They are the same issue.



在 2019年10月14日,下午5:17,Benoît Paris <[hidden email]> 写道: