Multiple select single result

Posted by dhanuka.priyanath on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Multiple-select-single-result-tp25488.html


Hi All,

I am trying to select multiple results from Kafka and send results to Kafka different topic using Table API. But I am getting below error. Could you please help me on this.

Query: 

SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 , mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' 
 UNION 
SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 , mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' 
 UNION 
SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 , mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' 


Error:

2019-01-13 21:36:36,228 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception occurred in REST handler.
org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 3 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
... 4 more
Caused by: org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
at org.apache.flink.table.api.Table.insertInto(table.scala:877)
at org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more


Source Code:

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerFunction("mytime", new MyTime(10));

tableEnv.connect(new Kafka().version("0.10").topic("testin")
.properties(kConsumer)
.startFromLatest())

.withFormat(new Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
.withSchema(new Schema()
.field("InterceptID", "DECIMAL")
.field("Provider_Info", "VARCHAR")
.field("LIID", "VARCHAR")
.field("TiggerID", "DECIMAL")
.field("ts", Types.SQL_TIMESTAMP())
.rowtime(new Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
.inAppendMode()

.registerTableSource(sourceTable);

// WindowedTable windowedTable =
// tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
//tableEnv.sqlQuery(query)
StringBuilder multi = new StringBuilder();
for(String sql : rules) {
    if(multi.length() > 0) {
multi.append(" UNION ").append("\n");
    }
    multi.append( sql);
    
}
LOGGER.info("********************************* " + multi.toString());
Table result = tableEnv.sqlQuery(multi.toString());

tableEnv
// declare the external system to connect to
.connect(new Kafka().version("0.10")
.topic("testout").startFromEarliest()
.properties(kProducer)
)
         
.withFormat(new Json().failOnMissingField(false).deriveSchema())
.withSchema(new Schema()
.field("TiggerID", Types.DECIMAL())
.field("RuleName", Types.STRING())
.field("ts1", Types.STRING())
.field("ts2", Types.STRING())
)

// specify the update-mode for streaming tables
.inAppendMode()
// register as source, sink, or both and under a name
.registerTableSourceAndSink("ruleTable");
//tableEnv.sqlUpdate( "INSERT INTO ruleTable " + result);
result.insertInto("ruleTable");

Cheers,
Dhanuka



--
Nothing Impossible,Creativity is more important than knowledge.