Hi Team, I am running a query for Time Window Join as below INSERT INTO sourceKafkaMalicious SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON sourceKafka.`source.ip`=badips.ip WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; Time windowed join, Flink SQL should automatically clear older records, Some how the query does not clear the heapspace and fails with error after sometime. Can you please let me know what could go wrong, or is it a issue Environment File chunks -------------------------------------------------------------------------------------------------------------------------------------------------------------- tables: - name: sourceKafka type: source-table update-mode: append connector: type: kafka version: "universal" topic: test-data-flatten properties: - key: zookeeper.connect value: x.x.x.x:2181 - key: bootstrap.servers value: x.x.x.x:9092 - key: group.id value: testgroup format: type: json fail-on-missing-field: false json-schema: > { type: 'object', properties: { 'source.ip': { type: 'string' }, 'source.port': { type: 'string' } } } derive-schema: false schema: - name: ' source.ip ' type: VARCHAR - name: 'source.port' type: VARCHAR - name: sourceKafkaMalicious type: sink-table update-mode: append connector: type: kafka version: "universal" topic: test-data-mal properties: - key: zookeeper.connect value: x.x.x.x:2181 - key: bootstrap.servers value: x.x.x.x:9092 - key: group.id value: testgroupmal format: type: json fail-on-missing-field: false json-schema: > { type: 'object', properties: { 'source.ip': { type: 'string' }, 'source.port': { type: 'string' } } } derive-schema: false schema: - name: ' source.ip ' type: VARCHAR - name: 'source.port' type: VARCHAR - name: badips type: source-table #update-mode: append connector: type: filesystem path: "/home/cyanadmin/ipsum/levels/badips.csv" format: type: csv fields: - name: ip type: VARCHAR comment-prefix: "#" schema: - name: ip type: VARCHAR execution: planner: blink type: streaming time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 3 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 restart-strategy: type: fallback configuration: table.optimizer.join-reorder-enabled: true table.exec.spill-compression.enabled: true table.exec.spill-compression.block-size: 128kb Properties that describe the cluster to which table programs are submitted to. deployment: response-timeout: 5000 -------------------------------------------------------------------------------------------------------------------------------------------------------------- |
Hi, The query that you wrote is not a time-windowed join. INSERT INTO sourceKafkaMalicious SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON sourceKafka.`source.ip`=badips.ip WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; The problem is the use of CURRENT_TIMESTAMP instead of a processing time (or event time) attribute of badips. What exactly are you trying to achieve with the query? Best, Fabian Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <[hidden email]>:
|
Hi Fabian, Thanks for your reply I have a continuous stream of kafka coming and static table of badips. I wanted to segregate records having bad ip. So therefore i was joining it. But with that 60 gb memory getting run out So i used below query. Can u please guide me in this regard On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi Nishant, You should model the query as a join with a time-versioned table [1]. The bad-ips table would be the time-time versioned table [2]. Since it is a time-versioned table, it could even be updated with new IPs. This type of join will only keep the time-versioned table (the bad-ips in state) and not the other (high-volume) table. Best, Fabian Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <[hidden email]>:
|
Hi Fabian, Thanks for the information. I have been reading about it and doing the same as a part of flink job written in Java I am using proctime for both the tables. Can you please verify once the implementation of temporal tables here is the snippet. ---------------------------- public class StreamingJob { public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Properties kafkaConsumerProperties = new Properties(); kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092"); kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cg54"); kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); DataStream<String> badipStream = env.addSource(new FlinkKafkaConsumer<>("badips", new SimpleStringSchema(), kafkaConsumerProperties)); DataStream<String> badipStreamM = badipStream .map(new MapFunction<String, String>() { private static final long serialVersionUID = -686775202L; @Override public String map(String value) throws Exception { try { String[] v = value.split("\\t"); if(v.length > 1) { return v[0].toString(); } else return "0.0.0.0"; } catch (Exception e) { System.err.println(e); return "0.0.0.0"; } } }); Table badipTable = tableEnv.fromDataStream(badipStreamM, "bad_ip, r_proctime.proctime"); tableEnv.registerTable("BadIP", badipTable); TemporalTableFunction badIPTT = badipTable.createTemporalTableFunction("r_proctime", "bad_ip"); tableEnv.registerFunction("BadIPTT", badIPTT); DataStream<ObjectNode> inKafkaStream = env .addSource(new FlinkKafkaConsumer<>("tests", new JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties)); DataStream<Tuple2<String,String>> inKafkaStreamM = inKafkaStream .rebalance() .filter(value -> value != null) .map(new MapFunction<ObjectNode, Tuple2<String,String>>() { private static final long serialVersionUID = -6867120202L; @Override public Tuple2<String,String> map(ObjectNode node) throws Exception { try { ObjectNode nodeValue = (ObjectNode) node.get("value"); return new Tuple2<>(nodeValue.get("source.ip").asText(), nodeValue.get("destination.ip").asText()); } catch (Exception e) { System.err.println(e); System.out.println(node); return null; } } }); Table kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, "sourceIp, destinationIp, k_proctime.proctime"); tableEnv.registerTable("KafkaSource", kafkaSource); Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp, K.destinationIp FROM KafkaSource AS K, LATERAL TABLE (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip"); TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.STRING()); DataStream<Tuple2<String,String>> outStreamMalicious = tableEnv.toAppendStream(resultKafkaMalicious, tupleType); Properties kafkaProducerProperties = new Properties(); kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092"); kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1"); kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); ObjectMapper mapper = new ObjectMapper(); DataStream<String> sinkStreamMaliciousData = outStreamMalicious .map(new MapFunction<Tuple2<String,String>,String>() { private static final long serialVersionUID = -6347120202L; @Override public String map(Tuple2<String,String> tuple) throws Exception { try { ObjectNode node = mapper.createObjectNode(); node.put("source.ip", tuple.f0); node.put("destination.ip", tuple.f1); return node.toString(); } catch (Exception e) { System.err.println(e); System.out.println(tuple); return null; } } }); sinkStreamMaliciousData.addSink(new FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(), kafkaProducerProperties)); env.execute("Flink List Matching"); } ------------------------------------------------------- On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <[hidden email]> wrote:
|
Hi, This looks OK on the first sight. Is it doing what you expect? Fabian Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta <[hidden email]>:
|
Use case is similar. But not able to check heap space issue, as data size is small. Thought of mean while checking with you. Thanks for looking into it. Really appreciate it. I have marked the usage of temporal tables in bold red for ease of reference. On Fri, Sep 20, 2019, 8:18 PM Fabian Hueske <[hidden email]> wrote:
|
Hi Fabian, I am facing an issue if run multiple queries like this: Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp, K.destinationIp FROM KafkaSource AS K, LATERAL TABLE (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip"); Table resultKafkaSafe = tableEnv.sqlQuery( "SELECT K.sourceIp, K.destinationIp FROM KafkaSource AS K, LATERAL TABLE (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp<>B.bad_ip"); (And also now heap space issue is not coming with temporal tables) Error: ------- Exception in thread "main" org.apache.flink.table.api.ValidationException: Only single column join key is supported. Found [] in [InnerJoin(where: (AND(__TEMPORAL_JOIN_CONDITION(k_proctime, bad_ip), <>(sourceIp, bad_ip))), join: (sourceIp, destinationIp, k_proctime, bad_ip))] at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:198) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152) at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:149) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:101) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:168) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152) at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:277) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$.create(DataStreamTemporalJoinToCoProcessTranslator.scala:117) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalTableJoin.createTranslator(DataStreamTemporalTableJoin.scala:77) at org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:110) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:98) at org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:250) at org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:431) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:421) at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227) at flink.StreamingJob.main(StreamingJob.java:140) On Fri, Sep 20, 2019 at 8:26 PM Nishant Gupta <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |