Posted by
Marco Villalobos-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/what-is-the-Flink-recommended-way-of-assigning-a-backfill-to-an-average-on-an-event-time-keyed-windo-tp36017p36074.html
I came up with a solution for backfills. However, at this moment, I am not happy with my solution.
I think there might be other facilities within Flink which allow me to implement a better more efficient or more scalable solution.
In another post,
[hidden email] suggested that I use a process function and a timer. He was right in that I should use that approach. I want to thank him.
The averages are computed by a ProcessWindowFunction that keys by the name and window size and uses a tumbling event time window.
However, after that average is complete, I then use a KeyedProcessFunction that is keyed by window size. I then use a somewhat brute force approach with ValueState<Set<String>> to track names that need a value and a MAP STATE to determine which values exist and which ones are backfilled.
It also cleans up stale values.
I committed my code to a branch
https://github.com/minmay/flink-patterns/tree/feature/backfill , and I also created a pull request
https://github.com/minmay/flink-patterns/pull/1/files to share my experience.
I am open critical comments on my approach, lack of understanding of Flink, algorithms and data-structures used. Please refrain from comments on my code style though.
I'll also copy and paste my solution below.
package mvillalobos.flink.patterns.timeseries.average;
import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@CommandLine.Command(name = "Time Series Average", mixinStandardHelpOptions = true,
description = "Compute the average of the time series with a 15 minute tumbling event time window and upsert the results into an Apache Derby database.")
public class TimeSeriesAverageApp implements Callable<Integer> {
private final static Logger logger = LoggerFactory.getLogger(TimeSeriesAverageApp.class);
@CommandLine.Option(names = {"-f", "--input-file"}, description = "The CSV input file of time series data. Each line must be in the format: String, double, Instant.")
private File inputFile;
@Override
public Integer call() throws Exception {
stream(inputFile.toString());
return 0;
}
public void stream(String inputFilePath) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// GIVEN a SOURCE with a CSV input file
// in which each line has a: String, double, Instant
// THEN the MAP operator
// transforms the line into a Tuple7
// f0: name: String
// f1: window_size: int
// f2: value: double
// f3: event_timestamp: Instant
// f4: aggregate_sum: double
// f5: aggregate_count double
// f6: is_backfile: boolean
// WHEN the map operation finishes
// THEN the event time assigned using field f3
final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream = env.readTextFile(inputFilePath)
.map(line -> {
final String[] split = line.split(",");
final String name = split[0];
final double value = Double.parseDouble(split[1]);
final Instant timestamp = Instant.parse(split[2]);
return Tuple7.of(name, 1, value, timestamp, value, 1, false);
}).returns(Types.TUPLE(Types.STRING, Types.INT, Types.DOUBLE, TypeInformation.of(Instant.class), Types.DOUBLE, Types.INT, Types.BOOLEAN))
.name("time series stream")
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<>() {
@Override
public long extractAscendingTimestamp(Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element) {
return element.f3.toEpochMilli();
}
}
);
final JDBCUpsertTableSink jdbcUpsertTableSink = buildJdbcUpsertTableSink();
upsertToJDBC(jdbcUpsertTableSink, timeSeriesStream);
// GIVEN a data stream with Tuple7
// f0: name: String
// f1: window_size: int
// f2: value: double
// f3: event_timestamp: Instant
// f4: aggregate_sum: double
// f5: aggregate_count double
// f6: is_backfill: boolean
// THEN the stream is KEYED BY: f0: name:String, f1: window_size: int
// THEN the stream is WINDOWED into a tumbling event time window of 15 minutes
// THEN the window is configured to allow elements late by 1 hour
// THEN a low-level process window function is applied to the window that
// aggregates the time series by assigning the following tuple fields:
// f1: window_size = 15 minutes in miliseconds
// f2: value = average value in this 15 minute window
// f3: event_timestamp = the first epoch millisecond in this 15 minute window
// f4: aggregate_sum = sum of f2 values in this 15 minute window
// f5: aggregate_count = number of values in this 15 minute window
final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>>
aggregateTimeSeriesStream = timeSeriesStream.keyBy(0, 1)
.window(TumblingEventTimeWindows.of(Time.minutes(15)))
.allowedLateness(Time.hours(1))
.process(new ProcessWindowFunction<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple, TimeWindow>() {
@Override
public void process(
Tuple tuple,
Context context,
Iterable<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> elements,
Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out
) throws Exception {
final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> aggregation = new Tuple7<>();
boolean is_window_initialized = false;
for (Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element : ImmutableList.copyOf(elements).reverse()) {
if (!is_window_initialized) {
final Instant timestamp = Instant.ofEpochMilli(context.window().getStart());
aggregation.f0 = element.f0;
aggregation.f1 = (int) Time.minutes(15).toMilliseconds();
aggregation.f2 = element.f2;
aggregation.f3 = timestamp;
aggregation.f4 = 0D;
aggregation.f5 = 0;
aggregation.f6 = false;
is_window_initialized = true;
}
aggregation.f4 += element.f2;
aggregation.f5++;
aggregation.f2 = aggregation.f4 / aggregation.f5;
}
logger.info("Added aggregation: {}", aggregation);
out.collect(aggregation);
}
}).name("averaged keyed tumbling window event time stream");
// GIVEN a data-stream of tuple7
// f0: name: String
// f1: window_size: int
// f2: value: double
// f3: event_timestamp: Instant
// f4: aggregate_sum: double
// f5: aggregate_count double
// f6: is_backfill: boolean
// THAT was aggregated to compute the average on f2: value: double
// WITH a grouping of: f0: name:String, f1: window_size: int
// WITH a tumbling event time window of 15 minutes
// THEN the stream is KEYED BY: f1: window_size: int
// THEN a low-level keyed process function is applied to the window that
// WHEN the keyed process function opens it
// initializes a VALUE STATE of TreeSet<String> called "nameSet"
// initializes a MAP STATE of
// KEY of Tuple2: f0: String, f1: Instant
// VALUE of Tuple7:
// f0: String
// f1: int
// f2: double
// f3: Instant
// f4: double
// f5: double
// f6: boolean
// called "backfillState"
// WHEN the keyed process function processes an element it
// adds each f0: name: String into the VALUE STATE "nameSet"
// adds each
// KEY of Tuple2: f0: name: String, f3: event_timestamp: Instant
// VALUE of Tuple7:
// f0: name: String
// f1: window_size: int
// f2: value: double
// f3: event_timestamp: Instant
// f4: aggregate_sum: double
// f5: aggregate_count double
// f6: is_backfill: boolean
// to the MAP STATE "backfillState"
// fires an timer to occur at f3: event_timestamp: Instant + 15 minutes (at the end of a window)
// WHEN the keyed process functions coalesced timers are handled it
// calculates the current "event_time" to handle which is the timestamp - 15 minutes
// iterates over each time series name in the "nameSet" for each "name":
// IF MAP STATE "backfillState" contains a KEY of Tuple2: "name", "event_time" THEN
// collect the VALUE as an OUT result because it is not a back fill
// ELSE
// the KEY of Tuple2: "name", "event_time" requires a back fill
// iterate over the MAP STATE "backfillState"
// filter the by "name" = Tuple2.f0
// filter by timestamp "event_time" > Tuple2.f1
// sort by key timestamp Tuple.f1 in ascending order
// collect into a List named "backfills"
// IF "backfills" is empty THEN there is no backfill
// ELSE
// the back fill is the last value in the list
// remove the other values in the list from MAP STATE "backfillState" as they are no longer needed
final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> backfilledAggregateTimeSeriesStream =
aggregateTimeSeriesStream.keyBy(1)
.process(
new KeyedProcessFunction<>() {
private ValueState<Set<String>> namesState;
private MapState<Tuple2<String, Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> backfillState;
@Override
public void open(Configuration parameters) {
MapStateDescriptor<Tuple2<String, Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> backfillDescriptor =
new MapStateDescriptor<>(
"backfill-state",
TypeInformation.of(new TypeHint<>() {}),
TypeInformation.of(new TypeHint<>() {})
);
backfillState = getRuntimeContext().getMapState(backfillDescriptor);
ValueStateDescriptor<Set<String>> namesDescriptor =
new ValueStateDescriptor<>("names-value-state", TypeInformation.of(new TypeHint<>() {}));
namesState = getRuntimeContext().getState(namesDescriptor);
}
@Override
public void processElement(
Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> value,
Context ctx,
Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out
) throws Exception {
if (namesState.value() == null) {
namesState.update(new TreeSet<>());
}
namesState.value().add(value.f0);
final Instant evenTime = value.f3;
final long timer = evenTime.toEpochMilli() + Time.minutes(15).toMilliseconds();
logger.info(
"processElement with key: {}, value: {}. registering timer: {}",
ctx.getCurrentKey(),
value,
Instant.ofEpochMilli(timer)
);
ctx.timerService().registerEventTimeTimer(timer);
final Tuple2<String, Instant> currentKey = new Tuple2<>(value.f0, value.f3);
backfillState.put(currentKey, value);
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out
) throws Exception {
final Instant event_time = Instant.ofEpochMilli(timestamp).minus(15, ChronoUnit.MINUTES);
for (String name : namesState.value()) {
Tuple2<String, Instant> key = new Tuple2<>(name, event_time);
if (backfillState.contains(key)) {
final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> value = backfillState.get(key);
logger.info(
"onTimer with key: {} timestamp: {}, event_time: {}, has value: {}",
ctx.getCurrentKey(),
Instant.ofEpochMilli(timestamp),
event_time,
value
);
out.collect(value);
} else {
final List<Map.Entry<Tuple2<String, Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>>> backfills
= StreamSupport.stream(backfillState.entries().spliterator(), false)
.filter(entry -> name.equals(entry.getKey().f0))
.filter(entry -> event_time.isAfter(entry.getKey().f1))
.sorted(Comparator.comparing(entry -> entry.getKey().f1))
.collect(Collectors.toList());
if (!backfills.isEmpty()) {
final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> value = backfills.get(backfills.size() - 1).getValue();
final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> backfill = new Tuple7<>(
value.f0, value.f1, value.f2, event_time, value.f4, value.f5, true
);
out.collect(backfill);
for (int i = 0; i < backfills.size() - 1; i++) {
backfillState.remove(backfills.get(i).getKey());
}
logger.info("onTimer with key: {} timestamp: {}, step: {}, has backfill: {}", ctx.getCurrentKey(), Instant.ofEpochMilli(timestamp), event_time, backfill);
}
}
}
logger.info("*****************");
}
});
upsertToJDBC(jdbcUpsertTableSink, backfilledAggregateTimeSeriesStream);
env.execute("time series");
}
private JDBCUpsertTableSink buildJdbcUpsertTableSink() {
final JDBCUpsertTableSink jdbcUpsertTableSink = JDBCUpsertTableSink.builder()
.setOptions(JDBCOptions.builder()
.setDBUrl("jdbc:derby:memory:flink")
.setTableName("time_series")
.build())
.setTableSchema(TableSchema.builder()
.field("name", DataTypes.VARCHAR(50).notNull())
.field("window_size", DataTypes.INT().notNull())
.field("value", DataTypes.DOUBLE().notNull())
.field("event_timestamp", DataTypes.TIMESTAMP().notNull())
.field("aggregate_sum", DataTypes.DOUBLE().notNull())
.field("aggregate_count", DataTypes.INT().notNull())
.field("is_backfill", DataTypes.BOOLEAN().notNull())
.primaryKey("name", "window_size", "event_timestamp")
.build())
.build();
jdbcUpsertTableSink.setKeyFields(new String[]{"name", "window_size", "event_timestamp"});
return jdbcUpsertTableSink;
}
private void upsertToJDBC(JDBCUpsertTableSink jdbcUpsertTableSink, DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream) {
jdbcUpsertTableSink.consumeDataStream(timeSeriesStream.map(t -> {
final Row row = new Row(7);
row.setField(0, t.f0);
row.setField(1, t.f1);
row.setField(2, t.f2);
row.setField(3, Timestamp.from(t.f3));
row.setField(4, t.f4);
row.setField(5, t.f5);
row.setField(6, t.f6);
return new Tuple2<>(true, row);
}).returns(new TypeHint<Tuple2<Boolean, Row>>() {
})).name("upsert to JDBC");
}
public static void main(String[] args) throws Exception {
final String databaseURL = "jdbc:derby:memory:flink;create=true";
int exitCode;
try (final Connection con = DriverManager.getConnection(databaseURL)) {
try (final Statement stmt = con.createStatement();) {
stmt.execute("CREATE TABLE time_series (\n" +
" id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n" +
" name VARCHAR(50) NOT NULL,\n" +
" window_size INTEGER NOT NULL DEFAULT 1,\n" +
" event_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
" value DOUBLE PRECISION NOT NULL DEFAULT 0,\n" +
" aggregate_sum DOUBLE PRECISION NOT NULL DEFAULT 0,\n" +
" aggregate_count INTEGER NOT NULL DEFAULT 1,\n" +
" is_backfill BOOLEAN NOT NULL DEFAULT false,\n" +
" version INTEGER NOT NULL DEFAULT 1,\n" +
" create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
" modify_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
" UNIQUE (name, window_size, event_timestamp)\n" +
")");
}
exitCode = new CommandLine(new TimeSeriesAverageApp()).execute(args);
try (final Statement stmt = con.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time FROM time_series ORDER BY window_size, event_timestamp, name");
while (rs.next()) {
final long id = rs.getLong(1);
final String name = rs.getString(2);
final int window_size = rs.getInt(3);
final Timestamp event_timestamp = rs.getTimestamp(4);
final double value = rs.getDouble(5);
final double aggregate_sum = rs.getDouble(6);
final int aggregate_count = rs.getInt(7);
final boolean is_backfill = rs.getBoolean(8);
final int version = rs.getInt(9);
final Timestamp create_time = rs.getTimestamp(10);
final Timestamp modify_time = rs.getTimestamp(11);
logger.info(
"id: {}, name: \"{}\", window_size: {}, event_timestamp: \"{}\", value: {}, aggregate_sum: {}, aggregate_count: {}, is_backfill: {} version: {} create_time: \"{}\" modify_time: \"{}\"",
id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time
);
}
}
}
System.exit(exitCode);
}
}