Posted by
Marco Villalobos-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/what-is-the-Flink-recommended-way-of-assigning-a-backfill-to-an-average-on-an-event-time-keyed-windo-tp36017.html
I need to compute averages on time series data upon a 15 minute tumbling event time window that is backfilled.
The time series data is a Tuple3 of name: String, value: double, event_time: Timestamp (Instant).
I need to compute the average value of the name time series on a tumbling window of 15 minutes with backfills such that
given the input:
a,10,2020-06-23T00:01:30.0000000Z
a,15,2020-06-23T00:02:30.0000000Z
a,20,2020-06-23T00:03:30.0000000Z
b,25,2020-06-23T00:03:30.0000000Z
b,30,2020-06-23T00:02:30.0000000Z
b,35,2020-06-23T00:01:30.0000000Z
b,35,2020-06-23T00:16:30.0000000Z
it yields the following averages with backfill:
a,15,2020-06-23 00:00:00.0
b,30,2020-06-23 00:00:00.0
a,15,2020-06-23 00:15:00.0
b,35,2020-06-23 00:15:00.0
Notice that although no value arrived "a" in the second quarter, the previous average was upserted.
I only got as far as computing the average, but I have not figured a recommended strategy for upserting the backfill.
I made a GitHub project to share my approach:
https://github.com/minmay/flink-patternsAnd the following code demonstrates my approach thus far.
Can somebody please provide me guidance on what is the "Flink" recommended way of assigning a backfill to an average on keyed windowed stream?
package mvillalobos.flink.patterns.timeseries.average;
import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Arrays;
public class TimeSeriesAverageApp {
private final static Logger logger = LoggerFactory.getLogger(TimeSeriesAverageApp.class);
public void stream(String inputFilePath) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// GIVEN a SOURCE with a CSV input file
// in which each line has a: String, double, Instant
// THEN the MAP operator
// transforms the line into a Tuple7
// f0: name: String
// f1: window_size: int
// f2: value: double
// f3: event_timestamp: Instant
// f4: aggregate_sum: double
// f5: aggregate_count double
// f6: is_backfile: boolean
// WHEN the map operation finishes
// THEN the event time assigned using field f3
final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream = env.readTextFile(inputFilePath)
.map(line -> {
final String[] split = line.split(",");
final String name = split[0];
final double value = Double.parseDouble(split[1]);
final Instant timestamp = Instant.parse(split[2]);
return Tuple7.of(name, 1, value, timestamp, value, 1, false);
}).returns(Types.TUPLE(Types.STRING, Types.INT, Types.DOUBLE, TypeInformation.of(Instant.class), Types.DOUBLE, Types.INT, Types.BOOLEAN))
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<>() {
@Override
public long extractAscendingTimestamp(Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element) {
return element.f3.toEpochMilli();
}
}
);
final JDBCUpsertTableSink jdbcUpsertTableSink = buildJdbcUpsertTableSink();
upsertToJDBC(jdbcUpsertTableSink, timeSeriesStream);
// GIVEN a data stream with Tuple7
// f0: name: String
// f1: window_size: int
// f2: value: double
// f3: event_timestamp: Instant
// f4: aggregate_sum: double
// f5: aggregate_count double
// f6: is_backfile: boolean
// THEN the stream is KEYED BY: f0: name:String, f1: window_size: int
// THEN the stream is WINDOWED into a tumbling event time window of 15 minutes
// THEN the window is configured to allow elements late by 1 hour
// THEN a low-level process function is applied to the window that
// aggregates the time series by assigning the following tuple fields:
// f1: window_size = 15 minutes in miliseconds
// f2: value = average value in this 15 minute window
// f3: event_timestamp = the first epoch millisecond in this 15 minute window
// f4: aggregate_sum = sum of f2 values in this 15 minute window
// f5: aggregate_count = number of values in this 15 minute window
final SingleOutputStreamOperator<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>>
aggregateTimeSeriesStream = timeSeriesStream.keyBy(0, 1)
.window(TumblingEventTimeWindows.of(Time.minutes(15)))
.allowedLateness(Time.hours(1))
.process(new ProcessWindowFunction<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple, TimeWindow>() {
@Override
public void process(
Tuple tuple,
Context context,
Iterable<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> elements,
Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out
) throws Exception {
final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> aggregation = new Tuple7<>();
boolean is_window_initialized = false;
for (Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element : ImmutableList.copyOf(elements).reverse()) {
if (!is_window_initialized) {
final Instant timestamp = Instant.ofEpochMilli(context.window().getStart());
aggregation.f0 = element.f0;
aggregation.f1 = (int) Time.minutes(15).toMilliseconds();
aggregation.f2 = element.f2;
aggregation.f3 = timestamp;
aggregation.f4 = 0D;
aggregation.f5 = 0;
aggregation.f6 = false;
is_window_initialized = true;
}
aggregation.f4 += element.f2;
aggregation.f5++;
aggregation.f2 = aggregation.f4 / aggregation.f5;
}
out.collect(aggregation);
}
});
upsertToJDBC(jdbcUpsertTableSink, aggregateTimeSeriesStream);
env.execute("time series");
}
private JDBCUpsertTableSink buildJdbcUpsertTableSink() {
final JDBCUpsertTableSink jdbcUpsertTableSink = JDBCUpsertTableSink.builder()
.setOptions(JDBCOptions.builder()
.setDBUrl("jdbc:derby:memory:flink")
.setTableName("time_series")
.build())
.setTableSchema(TableSchema.builder()
.field("name", DataTypes.VARCHAR(50).notNull())
.field("window_size", DataTypes.INT().notNull())
.field("value", DataTypes.DOUBLE().notNull())
.field("event_timestamp", DataTypes.TIMESTAMP().notNull())
.field("aggregate_sum", DataTypes.DOUBLE().notNull())
.field("aggregate_count", DataTypes.INT().notNull())
.field("is_backfill", DataTypes.BOOLEAN().notNull())
.primaryKey("name", "window_size", "event_timestamp")
.build())
.build();
jdbcUpsertTableSink.setKeyFields(new String[]{"name", "window_size", "event_timestamp"});
return jdbcUpsertTableSink;
}
private void upsertToJDBC(JDBCUpsertTableSink jdbcUpsertTableSink, DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream) {
jdbcUpsertTableSink.consumeDataStream(timeSeriesStream.map(t -> {
final Row row = new Row(7);
row.setField(0, t.f0);
row.setField(1, t.f1);
row.setField(2, t.f2);
row.setField(3, Timestamp.from(t.f3));
row.setField(4, t.f4);
row.setField(5, t.f5);
row.setField(6, t.f6);
return new Tuple2<>(true, row);
}).returns(new TypeHint<Tuple2<Boolean, Row>>() {
}));
}
public static void main(String[] args) throws Exception {
logger.info("Command line arguments: {}", Arrays.toString(args));
final String inputFilePath = args[0];
logger.info("Reading input file: {}", inputFilePath);
final String databaseURL = "jdbc:derby:memory:flink;create=true";
try (final Connection con = DriverManager.getConnection(databaseURL)) {
try (final Statement stmt = con.createStatement();) {
stmt.execute("CREATE TABLE time_series (\n" +
" id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n" +
" name VARCHAR(50) NOT NULL,\n" +
" window_size INTEGER NOT NULL DEFAULT 1,\n" +
" event_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
" value DOUBLE PRECISION NOT NULL DEFAULT 0,\n" +
" aggregate_sum DOUBLE PRECISION NOT NULL DEFAULT 0,\n" +
" aggregate_count INTEGER NOT NULL DEFAULT 1,\n" +
" is_backfill BOOLEAN NOT NULL DEFAULT false,\n" +
" version INTEGER NOT NULL DEFAULT 1,\n" +
" create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
" modify_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
" UNIQUE (name, window_size, event_timestamp)\n" +
")");
}
TimeSeriesAverageApp app = new TimeSeriesAverageApp();
app.stream(inputFilePath);
try (final Statement stmt = con.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time FROM time_series");
while (rs.next()) {
final long id = rs.getLong(1);
final String name = rs.getString(2);
final int window_size = rs.getInt(3);
final Timestamp event_timestamp = rs.getTimestamp(4);
final double value = rs.getDouble(5);
final double aggregate_sum = rs.getDouble(6);
final int aggregate_count = rs.getInt(7);
final boolean is_backfill = rs.getBoolean(8);
final int version = rs.getInt(9);
final Timestamp create_time = rs.getTimestamp(10);
final Timestamp modify_time = rs.getTimestamp(11);
logger.info(
"id: {}, name: \"{}\", window_size: {}, event_timestamp: \"{}\", value: {}, aggregate_sum: {}, aggregate_count: {}, is_backfill: {} version: {} create_time: \"{}\" modify_time: \"{}\"",
id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time
);
}
}
}
}
}