Re: Multiple select single result
Posted by
dhanuka.priyanath on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Multiple-select-single-result-tp25488p25494.html
Hi Fabian,
Thanks for the prompt reply and its working 🤗.
I am trying to deploy 200 SQL unions and it seems all the tasks getting failing after some time.
How do i allocate memory for task manager and job manager. What are the factors need to be considered .
Cheers
Dhanuka
Hi
Dhanuka,
The important error message here is "AppendStreamTableSink requires that Table has only insert changes".
This is because you use UNION instead of UNION ALL, which implies duplicate elimination.
Unfortunately, UNION is currently internally implemented as a regular aggregration which produces a retraction stream (although, this would not be necessary).
If you don't require duplicate elimination, you can replace UNION by UNION ALL and the query should work.
If you require duplicate elimination, it is currently not possible to use SQL for your use case.
There is thea Jira issue FLINK-9422 to improve this case [1].
Best, Fabian
Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
[hidden email]>:
Hi All,
I am trying to select multiple results from Kafka and send results to Kafka
different topic using Table API. But I am getting below error. Could you
please help me on this.
Query:
SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
UNION
SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
UNION
SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
*Error:*
2019-01-13 21:36:36,228 ERROR
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Exception
occurred in REST handler.
org.apache.flink.runtime.rest.handler.RestHandlerException:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 3 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
... 4 more
Caused by: org.apache.flink.table.api.TableException: AppendStreamTableSink
requires that Table has only insert changes.
at
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
at
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
at org.apache.flink.table.api.Table.insertInto(table.scala:877)
at
org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more
*Source Code:*
*StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
tableEnv.registerFunction("mytime", new MyTime(10)); tableEnv.connect(new
Kafka().version("0.10").topic("testin") .properties(kConsumer)
.startFromLatest()) .withFormat(new
Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
.withSchema(new Schema() .field("InterceptID", "DECIMAL")
.field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
.field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
.rowtime(new
Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
.inAppendMode() .registerTableSource(sourceTable); // WindowedTable
windowedTable = //
tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
//tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
for(String sql : rules) { if(multi.length() > 0) { multi.append(" UNION
").append("\n"); } multi.append( sql); }
LOGGER.info("********************************* " + multi.toString()); Table
result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
external system to connect to .connect(new Kafka().version("0.10")
.topic("testout").startFromEarliest() .properties(kProducer) )
.withFormat(new Json().failOnMissingField(false).deriveSchema())
.withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
.field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
.field("ts2", Types.STRING()) ) // specify the update-mode for streaming
tables .inAppendMode() // register as source, sink, or both and under a
name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
"INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
Cheers,Dhanuka*
--
Nothing Impossible,Creativity is more important than knowledge.