package com.dlvr.pipeline; import java.util.Properties; import java.util.logging.Logger; import java.sql.Timestamp; import com.dlvr.pipeline.model.Fragment; import com.dlvr.pipeline.model.Stat; import com.dlvr.pipeline.sources.KyloKafkaConsumer; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.types.Row; import org.apache.flink.streaming.api.windowing.time.Time; import com.dlvr.pipeline.falcon.FalconConfig; import com.dlvr.pipeline.falcon.FileSystemConfig; import com.dlvr.pipeline.falcon.sink.Sinks; public class StatsPipeline { private static final Logger log = Logger.getLogger(StatsPipeline.class.getSimpleName()); private final Config config; private static String FIELDS = "cfg, prov, dev_id, net_id, geo, " + "br, ttff, ttfs, prov_br_dn, prov_br_up, prov_be, prov_buff_dur, " + "dlvr_br_dn, dlvr_br_up, dlvr_be, dlvr_buff_dur, " + "isp_be, isp_buff_dur, bytes, segp, pidx, " + "created, init_seg, epoch, rowtime.rowtime"; private static final String STAT_SQL = "select cfg, prov, dev_id, net_id, geo, " + "sum(br) br_s, count(br) br_c, min(br) br_min, max(br) br_max, " + "sum(ttff) ttff_s, count(ttff) ttff_c, min(ttff) ttff_min, max(ttff) ttff_max, " + "sum(ttfs) ttfs_s, count(ttfs) ttfs_c, min(ttfs) ttfs_min, max(ttfs) ttfs_max, " + "sum(prov_br_dn) prov_br_dn, sum(prov_br_up) prov_br_up, sum(prov_be) prov_be, sum(prov_buff_dur) prov_buff_dur, " + "sum(dlvr_br_dn) dlvr_br_dn, sum(dlvr_br_up) dlvr_br_up, sum(dlvr_be) dlvr_be, sum(dlvr_buff_dur) dlvr_buff_dur, " + "sum(isp_be) isp_be, sum(isp_buff_dur) isp_buff_dur, " + "sum(segp) segp_s, count(segp) segp_c, min(segp) segp_min, max(segp) segp_max, " + "sum(pidx) pidx_s, count(pidx) pidx_c, min(pidx) pidx_min, max(pidx) pidx_max, " + "sum(created) total_session_c, sum(init_seg) session_c, sum(`bytes`) `bytes`, TUMBLE_START(rowtime, INTERVAL '2' MINUTE) min_ts " + "from fragments group by cfg, prov, dev_id, net_id, geo, TUMBLE(rowtime, INTERVAL '2' MINUTE)"; public static Properties getProperties() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "pdx1-5.pdx1.dlvr1.net:9092, pdx1-6.pdx1.dlvr1.net:9092, pdx1-7.pdx1.dlvr1.net:9092, " + "pdx1-8.pdx1.dlvr1.net:9092, pdx1-9.pdx1.dlvr1.net:9092"); properties.setProperty("group.id", "Dlvr-stats-pipeline"); return properties; } public StatsPipeline(Config config) { this.config = config; } public static Stat transformRowToStat(Row row) { Stat stat = new Stat(); stat.cfg = row.getField(0) == null ? "" : row.getField(0).toString(); stat.prov = row.getField(1) == null ? "" : row.getField(1).toString(); stat.dev_id = row.getField(2) == null ? "" : row.getField(2).toString(); stat.net_id = row.getField(3) == null ? null : (long) row.getField(3); stat.geo = row.getField(4) == null ? null : (int) row.getField(4); stat.min_ts = row.getField(38) == null ? null : (Timestamp)row.getField(38); //map the aggregated rollups stat.br_s = row.getField(5) == null ? 0 : (long) row.getField(5); stat.br_c = row.getField(6) == null ? 0 : (long) row.getField(6); stat.br_min = row.getField(7) == null ? 0 : (long) row.getField(7); stat.br_max = row.getField(8) == null ? 0 : (long) row.getField(8); stat.ttff_s = row.getField(9) == null ? 0 : (long) row.getField(9); stat.ttff_c = row.getField(10) == null ? 0 : (long) row.getField(10); stat.ttff_min = row.getField(11) == null ? 0 : (long) row.getField(11); stat.ttff_max = row.getField(12) == null ? 0 : (long) row.getField(12); stat.ttfs_s = row.getField(13) == null ? 0 : (long) row.getField(13); stat.ttfs_c = row.getField(14) == null ? 0 : (long) row.getField(14); stat.ttfs_min = row.getField(15) == null ? 0 : (long) row.getField(15); stat.ttfs_max = row.getField(16) == null ? 0 : (long) row.getField(16); stat.prov_br_dn = row.getField(17) == null ? 0 : (int) row.getField(17); stat.prov_br_up = row.getField(18) == null ? 0 : (int) row.getField(18); stat.prov_be = row.getField(19) == null ? 0 : (int) row.getField(19); stat.prov_buff_dur = row.getField(20) == null ? 0 : (double) row.getField(20); stat.dlvr_br_dn = row.getField(21) == null ? 0 : (int) row.getField(21); stat.dlvr_br_up = row.getField(22) == null ? 0 : (int) row.getField(22); stat.dlvr_be = row.getField(23) == null ? 0 : (int) row.getField(23); stat.dlvr_buff_dur = row.getField(24) == null ? 0 : (double) row.getField(24); stat.isp_be = row.getField(25) == null ? 0 : (int) row.getField(25); stat.isp_buff_dur = row.getField(26) == null ? 0 : (double) row.getField(26); stat.segp_s = row.getField(27) == null ? 0 : (double) row.getField(27); stat.segp_c = row.getField(28) == null ? 0 : (long) row.getField(28); stat.segp_min = row.getField(29) == null ? 0 : (double) row.getField(29); stat.segp_max = row.getField(30) == null ? 0 : (double) row.getField(30); stat.pidx_s = row.getField(31) == null ? 0 : (double) row.getField(31); stat.pidx_c = row.getField(32) == null ? 0 : (long) row.getField(32); stat.pidx_min = row.getField(33) == null ? 0 : (double) row.getField(33); stat.pidx_max = row.getField(34) == null ? 0 : (double) row.getField(34); stat.total_session_c = row.getField(35) == null ? 0 : (int) row.getField(35); stat.session_c = row.getField(36) == null ? 0 : (int) row.getField(36); stat.bytes = row.getField(37) == null ? 0 : (long) row.getField(37); return stat; } public void start() throws Exception { // setup streaming environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode() .build(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); log.info("Initialized Stats Pipeline"); env.setParallelism(config.parallelism); env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); KyloKafkaConsumer source = new KyloKafkaConsumer( Fragment.class.getClassLoader().loadClass("com.dlvr.pipeline.model.Fragment"), "pdx1-5.pdx1.dlvr1.net:9092, pdx1-6.pdx1.dlvr1.net:9092, pdx1-7.pdx1.dlvr1.net:9092, " + "pdx1-8.pdx1.dlvr1.net:9092, pdx1-9.pdx1.dlvr1.net:9092", "fragments"); DataStream input = env.addSource(source).keyBy("cfg") .assignTimestampsAndWatermarks(new OrderedTimestampExtractor()); tableEnv.registerTable("fragments", tableEnv.fromDataStream(input, FIELDS)); Table fragments = tableEnv.scan("fragments"); Table stats = tableEnv.sqlQuery(STAT_SQL); DataStream> results = tableEnv.toRetractStream(stats, Row.class); DataStream statsStream = results.map(new MapFunction, Stat>() { @Override public Stat map(Tuple2 statData) { Stat stat = new Stat(); return transformRowToStat(statData.getField(1)); } }); statsStream.addSink(createStatS3Sink()).name("stats-s3-sink").uid("stats-s3-id"); // execute program env.execute("Stats-Pipeline"); } /** * Create a sink function which encodes cache-stats in Parquet format and uses * Snappy compression. Writes files to the s3 storage path and * /parquet/cache subdirectory. * * @return parquet+snappy sink function writing to S3 */ private SinkFunction createStatS3Sink() { FalconConfig fc = new FalconConfig<>(); fc.dataType = Stat.class; //fc.sourceDataType = Fragment.class; FileSystemConfig fileSystemConfig = new FileSystemConfig(); fileSystemConfig.path = config.s3StoragePath; fc.target = fileSystemConfig; fc.target.setFormat(FalconConfig.Format.PARQUET); return Sinks.createSink(fc); } public static class OrderedTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor { public OrderedTimestampExtractor() { super(Time.seconds(10)); } @Override public long extractTimestamp(Fragment element) { return element.epoch; } } private static Config createConfigFromProgramArguments(String[] args) { ParameterTool params = ParameterTool.fromArgs(args); Config config = new Config(); // how often to checkpoint config.flinkCheckpointIntervalSecs = params.getInt("checkpoint_interval", config.flinkCheckpointIntervalSecs); // where to get data from config.kafkaStatsTopic = params.get("kafka_message_topic"); config.kafkaBootstrapServers = params.get("kafka_bootstrap_servers"); config.s3StoragePath = params.get("s3_storage_path"); config.parallelism = params.getInt("parallelism"); return config; } public static void main(String[] args) throws Exception { ParameterTool parameter = ParameterTool.fromArgs(args); // build config from program args, env vars, or system properties Config config = createConfigFromProgramArguments(args); StatsPipeline pipeline = new StatsPipeline(config); pipeline.start(); } }