Hi Team,
I am trying to Join [kafka stream] and [badip stream grouped with badip] Can someone please help me out with verifying what is wrong in highlighted query. Am I writing the time window join query wrong with this use case.? Or it is a bug and i should report this what is the work around, if it is a bug. Table1 Table kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, "sourceip,field1,field2, k_proctime.proctime") tableEnv.registerTable("KafkaSource", kafkaSource); Table2 Table badipTable = tableEnv.fromDataStream(badipStreamM, "bad_ip, b_proctime.proctime"); tableEnv.registerTable("BadIP", badipTable); Table latestBadIps = tableEnv.sqlQuery("SELECT MAX(b_proctime) AS mb_proctime, bad_ip FROM BadIP GROUP BY bad_ip HAVING MIN(b_proctime) > CURRENT_TIMESTAMP - INTERVAL '2' DAY "); tableEnv.registerTable("LatestBadIP", latestBadIps); Table3 - Join Success for below query Table resultKafkaMalicious = tableEnv.sqlQuery("SELECT K.* FROM KafkaSource AS K, LatestBadIP AS LB WHERE K.sourceip = LB.bad_ip"); Failure for below query Table resultKafkaMalicious = tableEnv.sqlQuery("SELECT K.* FROM KafkaSource AS K, LatestBadIP AS LB WHERE K.sourceip = LB.bad_ip AND LB.mb_proctime BETWEEN K.k_proctime - INTERVAL '4' HOUR AND K.k_proctime + INTERVAL '10' MINUTE"); Error: 14:25:25,230 INFO org.apache.flink.runtime.taskmanager.Task - InnerJoin(where: (AND(=(sourceip, bad_ip), >=(mb_proctime, -(PROCTIME(k_proctime), 14400000:INTERVAL HOUR)), <=(mb_proctime, +(PROCTIME(k_proctime), 600000:INTERVAL MINUTE)))), join: (tlsversion, tlscipher, tlscurve, tlsserver_name, tlsresumed, tlslast_alert, tlsnext_protocol, tlsestablished, tlsclient_cert_chain_fuids, tlssubject, tlsissuer, tlsclient_subject, tlsclient_issuer, tlsja3, ecsversion, sourceip, sourceport, sourcegeolower, sourcegeoupper, sourcegeocountry_iso_code, sourcegeocountry_name, sourcegeoregion_name, sourcegeocity_name, sourcegeolocationlat, sourcegeolocationlon, sourcegeozipcode, sourcegeotimezone, sourcegeoisp, sourcegeodomain, sourcegeonetspeed, sourcegeoiddcode, sourcegeoareacode, sourcegeoweatherstation_code, sourcegeoweatherstation_name, sourcegeomcc, sourcegeomnc, sourcegeomobilebrand, sourcegeoelevation, sourcegeousagetype, destinationip, destinationport, destinationgeolower, destinationgeoupper, destinationgeocountry_iso_code, destinationgeocountry_name, destinationgeoregion_name, destinationgeocity_name, destinationgeolocationlat, destinationgeolocationlon, destinationgeozipcode, destinationgeotimezone, destinationgeoisp, destinationgeodomain, destinationgeonetspeed, destinationgeoiddcode, destinationgeoareacode, destinationgeoweatherstation_code, destinationgeoweatherstation_name, destinationgeomcc, destinationgeomnc, destinationgeomobilebrand, destinationgeoelevation, destinationgeousagetype, eventid, eventprovider, eventdataset, eventtype, eventaction, organizationid, timestamp_received, clientmac, transactionid, timestamp, message, dhcpassigned_ip, dhcplease_time, dnsrtt, dnsquestionclass, dnsquestionname, dnsquestiontype, dnsquery, dnsqtype_name, dnsresponse_code, dnsrcode_name, dnsheader_flags, dnsanswersdata, dnsanswersttl, dnsrejected, networkprotocol, k_proctime, mb_proctime, bad_ip)) -> select: (tlsversion, tlscipher, tlscurve, tlsserver_name, tlsresumed, tlslast_alert, tlsnext_protocol, tlsestablished, tlsclient_cert_chain_fuids, tlssubject, tlsissuer, tlsclient_subject, tlsclient_issuer, tlsja3, ecsversion, sourceip, sourceport, sourcegeolower, sourcegeoupper, sourcegeocountry_iso_code, sourcegeocountry_name, sourcegeoregion_name, sourcegeocity_name, sourcegeolocationlat, sourcegeolocationlon, sourcegeozipcode, sourcegeotimezone, sourcegeoisp, sourcegeodomain, sourcegeonetspeed, sourcegeoiddcode, sourcegeoareacode, sourcegeoweatherstation_code, sourcegeoweatherstation_name, sourcegeomcc, sourcegeomnc, sourcegeomobilebrand, sourcegeoelevation, sourcegeousagetype, destinationip, destinationport, destinationgeolower, destinationgeoupper, destinationgeocountry_iso_code, destinationgeocountry_name, destinationgeoregion_name, destinationgeocity_name, destinationgeolocationlat, destinationgeolocationlon, destinationgeozipcode, destinationgeotimezone, destinationgeoisp, destinationgeodomain, destinationgeonetspeed, destinationgeoiddcode, destinationgeoareacode, destinationgeoweatherstation_code, destinationgeoweatherstation_name, destinationgeomcc, destinationgeomnc, destinationgeomobilebrand, destinationgeoelevation, destinationgeousagetype, eventid, eventprovider, eventdataset, eventtype, eventaction, organizationid, timestamp_received, clientmac, transactionid, timestamp, message, dhcpassigned_ip, dhcplease_time, dnsrtt, dnsquestionclass, dnsquestionname, dnsquestiontype, dnsquery, dnsqtype_name, dnsresponse_code, dnsrcode_name, dnsheader_flags, dnsanswersdata, dnsanswersttl, dnsrejected, networkprotocol, PROCTIME(k_proctime) AS k_proctime) -> to: Tuple2 -> Map -> Sink: Unnamed (4/8) (94e70bf5bb5b89ac5ae933c73c4b0353) switched from RUNNING to FAILED. org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.join.NonWindowJoin.compile(NonWindowJoin.scala:46) at org.apache.flink.table.runtime.join.NonWindowJoin.open(NonWindowJoin.scala:75) at org.apache.flink.table.runtime.join.NonWindowInnerJoin.open(NonWindowInnerJoin.scala:53) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator.open(LegacyKeyedCoProcessOperator.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.base/java.lang.Thread.run(Thread.java:844) Caused by: org.codehaus.commons.compiler.CompileException: Line 957, Column 26: Unknown variable or type "ctx" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6773) at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21.visitPackage(UnitCompiler.java:6385) at org.codehaus.janino.UnitCompiler$21.visitPackage(UnitCompiler.java:6382) at org.codehaus.janino.Java$Package.accept(Java.java:4237) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768) at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410) at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407) at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403) at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7019) at org.codehaus.janino.UnitCompiler.access$15700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$2.visitMethodInvocation(UnitCompiler.java:6430) at org.codehaus.janino.UnitCompiler$21$2.visitMethodInvocation(UnitCompiler.java:6403) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580) at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503) at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33) ... 11 more |
Hi Nishant, On a brief look. I think this is a problem with your 2nd query: Table2 This SQL statement states that the table is a ending and thus your final table generates a nonWindowJoin. If I understood you correctly, you were trying to emit some sort of bad IP address within a specific time window until it is last seen 2 days ago? What I am assuming you were trying to do is something similar to the OverWindowAggregate[1]. Similar to: "SELECT MAX(b_proctime) OVER ( PARTITION BY bad_ip RANGE BETWEEN INTERVAL '2' DAY PRECEDING AND CURRENT ROW ) FROM BadIP" Thanks, Rong On Mon, Sep 30, 2019 at 2:17 AM Nishant Gupta <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |