I need to compute averages on time series data upon a 15 minute tumbling event time window that is backfilled.
The time series data is a Tuple3 of name: String, value: double, event_time: Timestamp (Instant). I need to compute the average value of the name time series on a tumbling window of 15 minutes with backfills such that given the input: a,10,2020-06-23T00:01:30.0000000Z a,15,2020-06-23T00:02:30.0000000Z a,20,2020-06-23T00:03:30.0000000Z b,25,2020-06-23T00:03:30.0000000Z b,30,2020-06-23T00:02:30.0000000Z b,35,2020-06-23T00:01:30.0000000Z b,35,2020-06-23T00:16:30.0000000Z it yields the following averages with backfill: a,15,2020-06-23 00:00:00.0 b,30,2020-06-23 00:00:00.0 a,15,2020-06-23 00:15:00.0 b,35,2020-06-23 00:15:00.0 Notice that although no value arrived "a" in the second quarter, the previous average was upserted. I only got as far as computing the average, but I have not figured a recommended strategy for upserting the backfill. I made a GitHub project to share my approach: https://github.com/minmay/flink-patterns And the following code demonstrates my approach thus far. Can somebody please provide me guidance on what is the "Flink" recommended way of assigning a backfill to an average on keyed windowed stream? package mvillalobos.flink.patterns.timeseries.average; import com.google.common.collect.ImmutableList; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.io.jdbc.JDBCOptions; import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.sql.Timestamp; import java.time.Instant; import java.util.Arrays; public class TimeSeriesAverageApp { private final static Logger logger = LoggerFactory.getLogger(TimeSeriesAverageApp.class); public void stream(String inputFilePath) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // GIVEN a SOURCE with a CSV input file // in which each line has a: String, double, Instant // THEN the MAP operator // transforms the line into a Tuple7 // f0: name: String // f1: window_size: int // f2: value: double // f3: event_timestamp: Instant // f4: aggregate_sum: double // f5: aggregate_count double // f6: is_backfile: boolean // WHEN the map operation finishes // THEN the event time assigned using field f3 final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream = env.readTextFile(inputFilePath) .map(line -> { final String[] split = line.split(","); final String name = split[0]; final double value = Double.parseDouble(split[1]); final Instant timestamp = Instant.parse(split[2]); return Tuple7.of(name, 1, value, timestamp, value, 1, false); }).returns(Types.TUPLE(Types.STRING, Types.INT, Types.DOUBLE, TypeInformation.of(Instant.class), Types.DOUBLE, Types.INT, Types.BOOLEAN)) .assignTimestampsAndWatermarks( new AscendingTimestampExtractor<>() { @Override public long extractAscendingTimestamp(Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element) { return element.f3.toEpochMilli(); } } ); final JDBCUpsertTableSink jdbcUpsertTableSink = buildJdbcUpsertTableSink(); upsertToJDBC(jdbcUpsertTableSink, timeSeriesStream); // GIVEN a data stream with Tuple7 // f0: name: String // f1: window_size: int // f2: value: double // f3: event_timestamp: Instant // f4: aggregate_sum: double // f5: aggregate_count double // f6: is_backfile: boolean // THEN the stream is KEYED BY: f0: name:String, f1: window_size: int // THEN the stream is WINDOWED into a tumbling event time window of 15 minutes // THEN the window is configured to allow elements late by 1 hour // THEN a low-level process function is applied to the window that // aggregates the time series by assigning the following tuple fields: // f1: window_size = 15 minutes in miliseconds // f2: value = average value in this 15 minute window // f3: event_timestamp = the first epoch millisecond in this 15 minute window // f4: aggregate_sum = sum of f2 values in this 15 minute window // f5: aggregate_count = number of values in this 15 minute window final SingleOutputStreamOperator<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> aggregateTimeSeriesStream = timeSeriesStream.keyBy(0, 1) .window(TumblingEventTimeWindows.of(Time.minutes(15))) .allowedLateness(Time.hours(1)) .process(new ProcessWindowFunction<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple, TimeWindow>() { @Override public void process( Tuple tuple, Context context, Iterable<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> elements, Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out ) throws Exception { final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> aggregation = new Tuple7<>(); boolean is_window_initialized = false; for (Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element : ImmutableList.copyOf(elements).reverse()) { if (!is_window_initialized) { final Instant timestamp = Instant.ofEpochMilli(context.window().getStart()); aggregation.f0 = element.f0; aggregation.f1 = (int) Time.minutes(15).toMilliseconds(); aggregation.f2 = element.f2; aggregation.f3 = timestamp; aggregation.f4 = 0D; aggregation.f5 = 0; aggregation.f6 = false; is_window_initialized = true; } aggregation.f4 += element.f2; aggregation.f5++; aggregation.f2 = aggregation.f4 / aggregation.f5; } out.collect(aggregation); } }); upsertToJDBC(jdbcUpsertTableSink, aggregateTimeSeriesStream); env.execute("time series"); } private JDBCUpsertTableSink buildJdbcUpsertTableSink() { final JDBCUpsertTableSink jdbcUpsertTableSink = JDBCUpsertTableSink.builder() .setOptions(JDBCOptions.builder() .setDBUrl("jdbc:derby:memory:flink") .setTableName("time_series") .build()) .setTableSchema(TableSchema.builder() .field("name", DataTypes.VARCHAR(50).notNull()) .field("window_size", DataTypes.INT().notNull()) .field("value", DataTypes.DOUBLE().notNull()) .field("event_timestamp", DataTypes.TIMESTAMP().notNull()) .field("aggregate_sum", DataTypes.DOUBLE().notNull()) .field("aggregate_count", DataTypes.INT().notNull()) .field("is_backfill", DataTypes.BOOLEAN().notNull()) .primaryKey("name", "window_size", "event_timestamp") .build()) .build(); jdbcUpsertTableSink.setKeyFields(new String[]{"name", "window_size", "event_timestamp"}); return jdbcUpsertTableSink; } private void upsertToJDBC(JDBCUpsertTableSink jdbcUpsertTableSink, DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream) { jdbcUpsertTableSink.consumeDataStream(timeSeriesStream.map(t -> { final Row row = new Row(7); row.setField(0, t.f0); row.setField(1, t.f1); row.setField(2, t.f2); row.setField(3, Timestamp.from(t.f3)); row.setField(4, t.f4); row.setField(5, t.f5); row.setField(6, t.f6); return new Tuple2<>(true, row); }).returns(new TypeHint<Tuple2<Boolean, Row>>() { })); } public static void main(String[] args) throws Exception { logger.info("Command line arguments: {}", Arrays.toString(args)); final String inputFilePath = args[0]; logger.info("Reading input file: {}", inputFilePath); final String databaseURL = "jdbc:derby:memory:flink;create=true"; try (final Connection con = DriverManager.getConnection(databaseURL)) { try (final Statement stmt = con.createStatement();) { stmt.execute("CREATE TABLE time_series (\n" + " id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n" + " name VARCHAR(50) NOT NULL,\n" + " window_size INTEGER NOT NULL DEFAULT 1,\n" + " event_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" + " value DOUBLE PRECISION NOT NULL DEFAULT 0,\n" + " aggregate_sum DOUBLE PRECISION NOT NULL DEFAULT 0,\n" + " aggregate_count INTEGER NOT NULL DEFAULT 1,\n" + " is_backfill BOOLEAN NOT NULL DEFAULT false,\n" + " version INTEGER NOT NULL DEFAULT 1,\n" + " create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" + " modify_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" + " UNIQUE (name, window_size, event_timestamp)\n" + ")"); } TimeSeriesAverageApp app = new TimeSeriesAverageApp(); app.stream(inputFilePath); try (final Statement stmt = con.createStatement()) { final ResultSet rs = stmt.executeQuery("SELECT id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time FROM time_series"); while (rs.next()) { final long id = rs.getLong(1); final String name = rs.getString(2); final int window_size = rs.getInt(3); final Timestamp event_timestamp = rs.getTimestamp(4); final double value = rs.getDouble(5); final double aggregate_sum = rs.getDouble(6); final int aggregate_count = rs.getInt(7); final boolean is_backfill = rs.getBoolean(8); final int version = rs.getInt(9); final Timestamp create_time = rs.getTimestamp(10); final Timestamp modify_time = rs.getTimestamp(11); logger.info( "id: {}, name: \"{}\", window_size: {}, event_timestamp: \"{}\", value: {}, aggregate_sum: {}, aggregate_count: {}, is_backfill: {} version: {} create_time: \"{}\" modify_time: \"{}\"", id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time ); } } } } } |
I came up with a solution for backfills. However, at this moment, I am not happy with my solution.
I think there might be other facilities within Flink which allow me to implement a better more efficient or more scalable solution. In another post, [hidden email] suggested that I use a process function and a timer. He was right in that I should use that approach. I want to thank him. The averages are computed by a ProcessWindowFunction that keys by the name and window size and uses a tumbling event time window. However, after that average is complete, I then use a KeyedProcessFunction that is keyed by window size. I then use a somewhat brute force approach with ValueState<Set<String>> to track names that need a value and a MAP STATE to determine which values exist and which ones are backfilled. It also cleans up stale values. I committed my code to a branch https://github.com/minmay/flink-patterns/tree/feature/backfill , and I also created a pull request https://github.com/minmay/flink-patterns/pull/1/files to share my experience. I am open critical comments on my approach, lack of understanding of Flink, algorithms and data-structures used. Please refrain from comments on my code style though. I'll also copy and paste my solution below. package mvillalobos.flink.patterns.timeseries.average; import com.google.common.collect.ImmutableList; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.io.jdbc.JDBCOptions; import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; import java.io.File; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.sql.Timestamp; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @CommandLine.Command(name = "Time Series Average", mixinStandardHelpOptions = true, description = "Compute the average of the time series with a 15 minute tumbling event time window and upsert the results into an Apache Derby database.") public class TimeSeriesAverageApp implements Callable<Integer> { private final static Logger logger = LoggerFactory.getLogger(TimeSeriesAverageApp.class); @CommandLine.Option(names = {"-f", "--input-file"}, description = "The CSV input file of time series data. Each line must be in the format: String, double, Instant.") private File inputFile; @Override public Integer call() throws Exception { stream(inputFile.toString()); return 0; } public void stream(String inputFilePath) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // GIVEN a SOURCE with a CSV input file // in which each line has a: String, double, Instant // THEN the MAP operator // transforms the line into a Tuple7 // f0: name: String // f1: window_size: int // f2: value: double // f3: event_timestamp: Instant // f4: aggregate_sum: double // f5: aggregate_count double // f6: is_backfile: boolean // WHEN the map operation finishes // THEN the event time assigned using field f3 final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream = env.readTextFile(inputFilePath) .map(line -> { final String[] split = line.split(","); final String name = split[0]; final double value = Double.parseDouble(split[1]); final Instant timestamp = Instant.parse(split[2]); return Tuple7.of(name, 1, value, timestamp, value, 1, false); }).returns(Types.TUPLE(Types.STRING, Types.INT, Types.DOUBLE, TypeInformation.of(Instant.class), Types.DOUBLE, Types.INT, Types.BOOLEAN)) .name("time series stream") .assignTimestampsAndWatermarks( new AscendingTimestampExtractor<>() { @Override public long extractAscendingTimestamp(Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element) { return element.f3.toEpochMilli(); } } ); final JDBCUpsertTableSink jdbcUpsertTableSink = buildJdbcUpsertTableSink(); upsertToJDBC(jdbcUpsertTableSink, timeSeriesStream); // GIVEN a data stream with Tuple7 // f0: name: String // f1: window_size: int // f2: value: double // f3: event_timestamp: Instant // f4: aggregate_sum: double // f5: aggregate_count double // f6: is_backfill: boolean // THEN the stream is KEYED BY: f0: name:String, f1: window_size: int // THEN the stream is WINDOWED into a tumbling event time window of 15 minutes // THEN the window is configured to allow elements late by 1 hour // THEN a low-level process window function is applied to the window that // aggregates the time series by assigning the following tuple fields: // f1: window_size = 15 minutes in miliseconds // f2: value = average value in this 15 minute window // f3: event_timestamp = the first epoch millisecond in this 15 minute window // f4: aggregate_sum = sum of f2 values in this 15 minute window // f5: aggregate_count = number of values in this 15 minute window final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> aggregateTimeSeriesStream = timeSeriesStream.keyBy(0, 1) .window(TumblingEventTimeWindows.of(Time.minutes(15))) .allowedLateness(Time.hours(1)) .process(new ProcessWindowFunction<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple, TimeWindow>() { @Override public void process( Tuple tuple, Context context, Iterable<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> elements, Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out ) throws Exception { final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> aggregation = new Tuple7<>(); boolean is_window_initialized = false; for (Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element : ImmutableList.copyOf(elements).reverse()) { if (!is_window_initialized) { final Instant timestamp = Instant.ofEpochMilli(context.window().getStart()); aggregation.f0 = element.f0; aggregation.f1 = (int) Time.minutes(15).toMilliseconds(); aggregation.f2 = element.f2; aggregation.f3 = timestamp; aggregation.f4 = 0D; aggregation.f5 = 0; aggregation.f6 = false; is_window_initialized = true; } aggregation.f4 += element.f2; aggregation.f5++; aggregation.f2 = aggregation.f4 / aggregation.f5; } logger.info("Added aggregation: {}", aggregation); out.collect(aggregation); } }).name("averaged keyed tumbling window event time stream"); // GIVEN a data-stream of tuple7 // f0: name: String // f1: window_size: int // f2: value: double // f3: event_timestamp: Instant // f4: aggregate_sum: double // f5: aggregate_count double // f6: is_backfill: boolean // THAT was aggregated to compute the average on f2: value: double // WITH a grouping of: f0: name:String, f1: window_size: int // WITH a tumbling event time window of 15 minutes // THEN the stream is KEYED BY: f1: window_size: int // THEN a low-level keyed process function is applied to the window that // WHEN the keyed process function opens it // initializes a VALUE STATE of TreeSet<String> called "nameSet" // initializes a MAP STATE of // KEY of Tuple2: f0: String, f1: Instant // VALUE of Tuple7: // f0: String // f1: int // f2: double // f3: Instant // f4: double // f5: double // f6: boolean // called "backfillState" // WHEN the keyed process function processes an element it // adds each f0: name: String into the VALUE STATE "nameSet" // adds each // KEY of Tuple2: f0: name: String, f3: event_timestamp: Instant // VALUE of Tuple7: // f0: name: String // f1: window_size: int // f2: value: double // f3: event_timestamp: Instant // f4: aggregate_sum: double // f5: aggregate_count double // f6: is_backfill: boolean // to the MAP STATE "backfillState" // fires an timer to occur at f3: event_timestamp: Instant + 15 minutes (at the end of a window) // WHEN the keyed process functions coalesced timers are handled it // calculates the current "event_time" to handle which is the timestamp - 15 minutes // iterates over each time series name in the "nameSet" for each "name": // IF MAP STATE "backfillState" contains a KEY of Tuple2: "name", "event_time" THEN // collect the VALUE as an OUT result because it is not a back fill // ELSE // the KEY of Tuple2: "name", "event_time" requires a back fill // iterate over the MAP STATE "backfillState" // filter the by "name" = Tuple2.f0 // filter by timestamp "event_time" > Tuple2.f1 // sort by key timestamp Tuple.f1 in ascending order // collect into a List named "backfills" // IF "backfills" is empty THEN there is no backfill // ELSE // the back fill is the last value in the list // remove the other values in the list from MAP STATE "backfillState" as they are no longer needed final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> backfilledAggregateTimeSeriesStream = aggregateTimeSeriesStream.keyBy(1) .process( new KeyedProcessFunction<>() { private ValueState<Set<String>> namesState; private MapState<Tuple2<String, Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> backfillState; @Override public void open(Configuration parameters) { MapStateDescriptor<Tuple2<String, Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> backfillDescriptor = new MapStateDescriptor<>( "backfill-state", TypeInformation.of(new TypeHint<>() {}), TypeInformation.of(new TypeHint<>() {}) ); backfillState = getRuntimeContext().getMapState(backfillDescriptor); ValueStateDescriptor<Set<String>> namesDescriptor = new ValueStateDescriptor<>("names-value-state", TypeInformation.of(new TypeHint<>() {})); namesState = getRuntimeContext().getState(namesDescriptor); } @Override public void processElement( Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> value, Context ctx, Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out ) throws Exception { if (namesState.value() == null) { namesState.update(new TreeSet<>()); } namesState.value().add(value.f0); final Instant evenTime = value.f3; final long timer = evenTime.toEpochMilli() + Time.minutes(15).toMilliseconds(); logger.info( "processElement with key: {}, value: {}. registering timer: {}", ctx.getCurrentKey(), value, Instant.ofEpochMilli(timer) ); ctx.timerService().registerEventTimeTimer(timer); final Tuple2<String, Instant> currentKey = new Tuple2<>(value.f0, value.f3); backfillState.put(currentKey, value); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out ) throws Exception { final Instant event_time = Instant.ofEpochMilli(timestamp).minus(15, ChronoUnit.MINUTES); for (String name : namesState.value()) { Tuple2<String, Instant> key = new Tuple2<>(name, event_time); if (backfillState.contains(key)) { final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> value = backfillState.get(key); logger.info( "onTimer with key: {} timestamp: {}, event_time: {}, has value: {}", ctx.getCurrentKey(), Instant.ofEpochMilli(timestamp), event_time, value ); out.collect(value); } else { final List<Map.Entry<Tuple2<String, Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>>> backfills = StreamSupport.stream(backfillState.entries().spliterator(), false) .filter(entry -> name.equals(entry.getKey().f0)) .filter(entry -> event_time.isAfter(entry.getKey().f1)) .sorted(Comparator.comparing(entry -> entry.getKey().f1)) .collect(Collectors.toList()); if (!backfills.isEmpty()) { final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> value = backfills.get(backfills.size() - 1).getValue(); final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> backfill = new Tuple7<>( value.f0, value.f1, value.f2, event_time, value.f4, value.f5, true ); out.collect(backfill); for (int i = 0; i < backfills.size() - 1; i++) { backfillState.remove(backfills.get(i).getKey()); } logger.info("onTimer with key: {} timestamp: {}, step: {}, has backfill: {}", ctx.getCurrentKey(), Instant.ofEpochMilli(timestamp), event_time, backfill); } } } logger.info("*****************"); } }); upsertToJDBC(jdbcUpsertTableSink, backfilledAggregateTimeSeriesStream); env.execute("time series"); } private JDBCUpsertTableSink buildJdbcUpsertTableSink() { final JDBCUpsertTableSink jdbcUpsertTableSink = JDBCUpsertTableSink.builder() .setOptions(JDBCOptions.builder() .setDBUrl("jdbc:derby:memory:flink") .setTableName("time_series") .build()) .setTableSchema(TableSchema.builder() .field("name", DataTypes.VARCHAR(50).notNull()) .field("window_size", DataTypes.INT().notNull()) .field("value", DataTypes.DOUBLE().notNull()) .field("event_timestamp", DataTypes.TIMESTAMP().notNull()) .field("aggregate_sum", DataTypes.DOUBLE().notNull()) .field("aggregate_count", DataTypes.INT().notNull()) .field("is_backfill", DataTypes.BOOLEAN().notNull()) .primaryKey("name", "window_size", "event_timestamp") .build()) .build(); jdbcUpsertTableSink.setKeyFields(new String[]{"name", "window_size", "event_timestamp"}); return jdbcUpsertTableSink; } private void upsertToJDBC(JDBCUpsertTableSink jdbcUpsertTableSink, DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream) { jdbcUpsertTableSink.consumeDataStream(timeSeriesStream.map(t -> { final Row row = new Row(7); row.setField(0, t.f0); row.setField(1, t.f1); row.setField(2, t.f2); row.setField(3, Timestamp.from(t.f3)); row.setField(4, t.f4); row.setField(5, t.f5); row.setField(6, t.f6); return new Tuple2<>(true, row); }).returns(new TypeHint<Tuple2<Boolean, Row>>() { })).name("upsert to JDBC"); } public static void main(String[] args) throws Exception { final String databaseURL = "jdbc:derby:memory:flink;create=true"; int exitCode; try (final Connection con = DriverManager.getConnection(databaseURL)) { try (final Statement stmt = con.createStatement();) { stmt.execute("CREATE TABLE time_series (\n" + " id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n" + " name VARCHAR(50) NOT NULL,\n" + " window_size INTEGER NOT NULL DEFAULT 1,\n" + " event_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" + " value DOUBLE PRECISION NOT NULL DEFAULT 0,\n" + " aggregate_sum DOUBLE PRECISION NOT NULL DEFAULT 0,\n" + " aggregate_count INTEGER NOT NULL DEFAULT 1,\n" + " is_backfill BOOLEAN NOT NULL DEFAULT false,\n" + " version INTEGER NOT NULL DEFAULT 1,\n" + " create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" + " modify_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" + " UNIQUE (name, window_size, event_timestamp)\n" + ")"); } exitCode = new CommandLine(new TimeSeriesAverageApp()).execute(args); try (final Statement stmt = con.createStatement()) { final ResultSet rs = stmt.executeQuery("SELECT id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time FROM time_series ORDER BY window_size, event_timestamp, name"); while (rs.next()) { final long id = rs.getLong(1); final String name = rs.getString(2); final int window_size = rs.getInt(3); final Timestamp event_timestamp = rs.getTimestamp(4); final double value = rs.getDouble(5); final double aggregate_sum = rs.getDouble(6); final int aggregate_count = rs.getInt(7); final boolean is_backfill = rs.getBoolean(8); final int version = rs.getInt(9); final Timestamp create_time = rs.getTimestamp(10); final Timestamp modify_time = rs.getTimestamp(11); logger.info( "id: {}, name: \"{}\", window_size: {}, event_timestamp: \"{}\", value: {}, aggregate_sum: {}, aggregate_count: {}, is_backfill: {} version: {} create_time: \"{}\" modify_time: \"{}\"", id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time ); } } } System.exit(exitCode); } } |
Hi Marco, That's a lot of code to digest. So I'm sorry if I did get something wrong. From your example, it looks like you want to use the average within a tumble window. If no record for a particular key has been emitted in that time, you want to repeat the last value. I'd use a dummy record to force the windows to be triggered and then ignore it on aggregation. Here is a sketch aggregateTimeSeriesStream.keyBy(1). .process(<store all keys in a state, on 15 min timer, emit dummy record for all keys>) .window(<tumble 15 min>) .reduce(<average, ignore dummy record, create dummy record if empty>) .process(<store last record per key, replace dummy record with last value>) This approach needs more functions, but they can be chained mostly. However, it should be smaller pieces of work that should be easier to maintain and test. You especially save the trouble of writing the window logic yourself. On Thu, Jun 18, 2020 at 7:11 PM Marco Villalobos <[hidden email]> wrote: I came up with a solution for backfills. However, at this moment, I am not happy with my solution. -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |