package benchmark.flinkspark.flink; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Calendar; import java.util.HashSet; import java.util.List; import java.util.Set; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.io.PrintWriter; import java.util.Map; import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisURI; import com.lambdaworks.redis.RedisConnection; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import java.net.InetAddress; import java.net.UnknownHostException; /** * Read Linear Road records come from Kafka server and process them in the provided inner class . * * Please pass the following arguments to the Beam Runner's run command: * --topic lroad --bootstrap.servers "kirk":9092 --zookeeper.connect "kafkahost":2181 --group.id eventsGroup * */ /** * @author Amir Bahmanyari * */ public class BenchBeamRunners { static final RedisConnection connection = new RedisClient(RedisURI.create("redis://beam1:30001")).connect(); static final RedisConnection tolls = new RedisClient(RedisURI.create("redis://beam1:30001")).connect(); static final RedisConnection histClient = new RedisClient(RedisURI.create("redis://beam1:30001")).connect(); RedisClient redisClient; RedisClient histsredis; RedisClient tollsredis; private void populateHistRedis() { String tollskeys; String[] Tollstokens; String line = null; System.out.println("XXXXX Started loading historicals in Redis"); try { BufferedReader reader = new BufferedReader( new FileReader(new File("/tmp/OutFileGen.tolls.dat"))); while ((line = reader.readLine()) != null) { Tollstokens = line.split(","); tollskeys = Tollstokens[0] + "-" + Tollstokens[1] + "-" + Tollstokens[2]; histClient.hset("historics", tollskeys, Tollstokens[3]); System.out.println("XXXXX ...........loading historicals"); //connection.hset("historics", tollskeys, Tollstokens[3]); } if (null != reader) reader.close(); } catch (IOException ioexp) { System.out.println("XXXXX Exception while loading historicals in Redis: "+ioexp.getMessage()); ioexp.printStackTrace(); } System.out.println("XXXXX Finished loading historicals in Redis"); } /* private void connRedisFirst(){ if (null == connection) { redisClient = new RedisClient(RedisURI.create("redis://beam1:30001")); connection = redisClient.connect(); } if (null == tolls) { tollsredis = new RedisClient(RedisURI.create("redis://beam1:30001")); tolls = tollsredis.connect(); } if (null == histClient) { histsredis = new RedisClient(RedisURI.create("redis://beam1:30001")); histClient = histsredis.connect(); } connection.set("type0Processed", "0"); connection.set("type1Processed", "0"); connection.set("type2Processed", "0"); connection.set("type3Processed", "0"); connection.set("type0Seen", "0"); connection.set("type2Seen", "0"); connection.set("type3Seen", "0"); }*/ /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); options.setRunner(FlinkRunner.class); options.setStreaming(true); //options.setParallelism(100); Pipeline p = Pipeline.create(options); //List topics = Arrays.asList("linroad3"); int type3Processe = 0; //BenchBeamRunners bbr = new BenchBeamRunners(); //bbr.connRedisFirst(); //bbr.populateHistRedis(); try { /*PCollection> kafkarecords = p .apply(KafkaIO.read().withBootstrapServers("kafka01:9092").withTopics(topics) .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) .apply("startBundle", ParDo.of( new DoFn, KV>() {*/ p.apply(TextIO.Read.from("//tmp//OutFileGen.dat")). apply("collect",ParDo.of(new DoFn() { /** * */ private static final long serialVersionUID = 1L; @Override public void processElement(ProcessContext ctx) throws Exception { } })) .apply("PseduLRDoFn", ParDo.of(new DoFn() { private static final long serialVersionUID = 1L; //RedisClient redisClient; //RedisClient histsredis; //RedisClient tollsredis; //RedisConnection connection = null; //RedisConnection tolls = null; //RedisConnection histClient = null; InetAddress iAddress = InetAddress.getLocalHost(); String hostName = iAddress.getHostName(); //To get the Canonical host name String canonicalHostName = iAddress.getCanonicalHostName(); String line = null; String strkey = null; String _outfileName = null; String _outT3fileName = null; boolean bolEndOfFile = false; // PrintWriter _writer = null; PrintWriter _writerT3 = null; @Override public void processElement(ProcessContext ctx) throws Exception { Map mt; //System.out.println("ABDEBUG: "+canonicalHostName+ " Received from Kafka: "+ctx.element().toString()); int posA = ctx.element().toString().lastIndexOf("KV{[], "); if (posA == -1) { line = ctx.element().toString(); } else { int adjustedPosA = posA + "KV{[], ".length(); line = ctx.element().toString().substring(adjustedPosA); } if (line != null) line = line.replaceAll("}", ""); if (line != null) { /* try { if (null == connection || null == tolls || null == histClient) { connRedis(); //if (null != histClient) //populateHistRedis(); } } catch (Exception ex) { ex.printStackTrace(); }*/ System.out.println("ABDEBUG: "+hostName+ " Line to be used is: "+line); try { mt = BeamAppSupport.createMT(line.split(",")); } catch (NullPointerException exp) { // do nothing; System.out.println("Did nothing in UtilitySL.createMT"); exp.printStackTrace(); return; } int type = Integer.parseInt(mt.get("type")); switch (type) { case 0: connection.incr("type0Seen"); t0(mt); break; case 2: connection.incr("type2Seen"); //t2(mt); break; case 3: //connection.incr("type3Seen"); //t3(mt); break; case 9999: // end of file bolEndOfFile = true; DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss z"); Calendar cal = Calendar.getInstance(); System.out.println("ABDEBUG:9999 "+hostName+ " Last line to be skipped at time: "+dateFormat.format(cal.getTime())); break; } } // of if } /** * */ /* public void populateHistRedis() { String tollskeys; String[] Tollstokens; try { BufferedReader reader = new BufferedReader( new FileReader(new File("/tmp/matchedTolls.dat"))); while ((line = reader.readLine()) != null) { Tollstokens = line.split(","); tollskeys = Tollstokens[0] + "-" + Tollstokens[1] + "-" + Tollstokens[2]; histClient.hset("historics", tollskeys, Tollstokens[3]); } if (null != reader) reader.close(); } catch (IOException ioexp) { ioexp.printStackTrace(); } System.out.println("XXXXX Finished loading historicals in Redis"); }*/ /** * */ /* public void connRedis() { if (null == connection) { redisClient = new RedisClient(RedisURI.create("redis://kirk:6379")); connection = redisClient.connect(); } if (null == tolls) { tollsredis = new RedisClient(RedisURI.create("redis://kirk:6379")); tolls = tollsredis.connect(); } if (null == histClient) { histsredis = new RedisClient(RedisURI.create("redis://kirk:6379")); histClient = histsredis.connect(); } connection.set("type0Processed", "0"); connection.set("type1Processed", "0"); connection.set("type2Processed", "0"); connection.set("type3Processed", "0"); connection.set("type0Seen", "0"); connection.set("type2Seen", "0"); connection.set("type3Seen", "0"); }// of connRedis */ /** * @param mt */ public void t0(Map mt) { String val = null; String[] tokens = null; long startTime = System.currentTimeMillis(); int min = Integer.parseInt(mt.get("time")) / 180 + 1; String stoppedKey = String.format("%s-%s-%s-%s-%s", mt.get("xWay"), mt.get("dir"), mt.get("lane"), mt.get("seg"), mt.get("pos")); //Create a new Seg-Min combo. Make sure it doesnt already exist. String segKey = BeamAppSupport.LRGetOrCreateSeg(mt, connection); val = BeamAppSupport.LRCreateCarIfNotExists(mt, connection); if (val != null) tokens = val.split(","); else return; if (BeamAppSupport.isAnomalousCar(mt, tokens) == true) { return; } //connection.hset(mt.get("carId"), "carId", mt.get("carId")); 0 //connection.hset(mt.get("carId"), "lastTime", "-1"); 1 //connection.hset(mt.get("carId"), "lastSpeed", "-1"); 2 //connection.hset(mt.get("carId"), "lastXWay", "-1"); 3 //connection.hset(mt.get("carId"), "lastLane", "-1"); 4 //connection.hset(mt.get("carId"), "lastDir", "-1"); 5 //connection.hset(mt.get("carId"), "lastSeg", "-1"); 6 //connection.hset(mt.get("carId"), "lastPos", "-1"); 7 //connection.hset(mt.get("carId"), "xPos", "0"); 8 //connection.hset(mt.get("carId"), "lastToll", "0"); 9 // SAME POSITION? if (tokens[7].equals(mt.get("pos")) && tokens[4].equals(mt.get("lane"))) { //42000 //if (tokens[3].equals(mt.get("xWay")) && tokens[5].equals(mt.get("dir")) && tokens[7].equals(mt.get("pos")) && tokens[4].equals(mt.get("lane"))) { if (tokens[8].equals("3")) { // Already seen 3 times, create a Stopped car if (BeamAppSupport.LRCreateStoppedCar(stoppedKey, mt.get("carId"), connection)) { BeamAppSupport.LRCreateAccident( stoppedKey, String.format("%s-%s-%s", mt.get("xWay"), mt.get("dir"), mt.get("seg")), mt.get("time"), connection); } } tokens[8] = Integer.toString(Integer.parseInt(tokens[8]) + 1); // NEW POSITION } else { String prevStoppedKey = String.format("%s-%s-%s-%s-%s", tokens[3], tokens[5], tokens[4], tokens[6], tokens[7]); BeamAppSupport.LRRemoveStoppedIfAny(prevStoppedKey, mt, connection); String prevAccidentKey = String.format("%s-%s-%s", tokens[3], tokens[5], tokens[6]); BeamAppSupport.LRClearAccidentIfAny(prevAccidentKey, mt, connection); tokens[8] = "1"; // Reset current car's number of times at this position // NEW POSITION BUT SAME SEGMENT if (mt.get("seg").equals(tokens[6])) { if (mt.get("lane").equals("4")) { tokens[4] = "4"; } // NEW POSITION NEW SEGMENT, thats t0..need to send toll notifs } else { int currToll = 0; int numv = 0; int lav = 0; //tokens[3].equals(mt.get("xWay")) && tokens[5].equals(mt.get("dir")) if (!(mt.get("lane").equals("4"))) { // 50k if (tokens[3].equals(mt.get("xWay")) && tokens[5].equals(mt.get("dir")) && !(mt.get("lane").equals("4"))) { String lastMinKey = String.format("%s-%s-%s-%d", mt.get("xWay"), mt.get("dir"), mt.get("seg"), (min - 1)); numv = BeamAppSupport.LRGetNumV(lastMinKey, connection); if (numv > 50) currToll = BeamAppSupport.calcToll(numv, connection); lav = BeamAppSupport.LRGetLav(mt, min, connection); if (lav >= 40) currToll = 0; // ACCIDENTS int accSeg = BeamAppSupport.LRInAccidentZone(mt, min, connection); String strOutput = "INIT_OUT"; if (accSeg >= 0) { currToll = 0; strOutput = String.format("1,%s,%d,%s,%s,%s,%s\n", mt.get("time"), Integer.parseInt(mt.get("time")) + (System.currentTimeMillis() - startTime), mt.get("xWay"), accSeg, mt.get("dir"), mt.get("carId")); connection.incr("type1Processed"); } strOutput = String.format("0,%s,%s,%d,%d,%d\n", mt.get("carId"), mt.get("time"), (System.currentTimeMillis() - startTime), lav, currToll); connection.incr("type0Processed"); } // PREVIOUS TOLL if (Integer.parseInt(tokens[9]) > 0) { tolls.rpush(tokens[0] + "-tolls", mt.get("time"), tokens[9]); } tokens[9] = Integer.toString(currToll); } } // Update car and segment info. Car info // should already be partially updated. String carLine = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s", tokens[0], mt.get("time"), mt.get("speed"), mt.get("xWay"), mt.get("lane"), mt.get("dir"), mt.get("seg"), mt.get("pos"), tokens[8], tokens[9]); connection.hset("currentcars", tokens[0], carLine); // added due to difference with Validator /* commented out due to not working in cluster int removeMin = min - 6; String segRemovedKey = String.format("%s-%s-%s-%s", mt.get("xWay"), mt.get("dir"), mt.get("seg"), removeMin); if (null != connection.hget("segSumSpeeds", segRemovedKey)) connection.del("segSumSpeeds", segRemovedKey); if (null != connection.hget("segSumNumReadings", segRemovedKey)) connection.del("segSumNumReadings", segRemovedKey); if (null != connection.hget("segCarIdSet", segRemovedKey)) connection.del("segCarIdSet", segRemovedKey); */ if (null != connection.hget("segSumSpeeds", segKey)) connection.hset("segSumSpeeds", segKey, String.valueOf(Integer.parseInt(connection.hget("segSumSpeeds", segKey)) + Integer.parseInt(mt.get("speed")))); if (null != connection.hget("segSumNumReadings", segKey)) connection.hset("segSumNumReadings", segKey, String.valueOf( Integer.parseInt(connection.hget("segSumNumReadings", segKey)) + 1)); if (((Boolean) (connection.hexists("segCarIdSet", segKey))) .booleanValue() == true) { // double // check // logic //Set mp = new HashSet(); connection.smembers(segKey).add(mt.get("carId")); //mp.add(mt.get("carId")); //connection.sadd(segKey, mt.get("carId")); } } /** * @param mt */ public void t3(Map mt) { try { String k = mt.get("carId") + "-" + mt.get("day") + "-" + mt.get("xWay"); // int toll = 0; if (((Boolean) (histClient.hexists("historics", k))).booleanValue() == true && Integer.parseInt(mt.get("day")) != 0) { // toll = Integer.parseInt((String) // histClient.hget("historics", k)); connection.incr("type3Processed"); System.out.println("Number of Type3 Processed so far: " + connection.get("type3Processed")); if (null != _writerT3 && Integer.parseInt(connection.get("type3Processed")) > 110) _writerT3.flush(); } } catch (Exception exp) { // swallow, do nothing... System.out.println("Strange record: Did nothing in t3 " + exp.getMessage()); // exp.printStackTrace(); if needed return; // skip this strange record } } public void t2(Map mt) {// will // test // later connection.incr("type2Processed"); } })); } catch (Exception exp) { exp.printStackTrace(); } System.out.printf("\n...Completed method"); try { System.out.printf("\n...about to run pipeline"); p.run(); } catch (Throwable ex) { System.out.printf("\n...Running thread threw: "); ex.printStackTrace(); } System.out.printf("\n...after running thread "); } }