I am wondering how you would implement the following function in Flink. The function takes as input two streams. One stream can be viewed a a tuple with two value (x, y), the second stream is a stream of individual values z. The function keeps a time based window on the first input (say, 24 hours). Whenever it receives an element from the second stream, it compares the value z against the x element of each tuple in the window, and for each match it emits (x, y). You are basically doing a join on x=z. Note that values from the second stream are not windowed and they are only matched to values from the first stream with an earlier timestamps.
This was relatively easy to implement in Samza. Consume off two topics, the first keyed by x and the second by z. Consume both topics in a job. Messages with the same key would be consumed by the same task. The task could maintain a window of messages from the first stream in its local state, Whenever a message came in via the second stream, it could look up in the local state for matching messages, and if it found any, send them to the output stream. Obviously, with Samza you don't have the luxury of the system handling event time for you, but this work well and it is easy to implement. I am not clear how this would be implemented in Flink. It is easy enough to partition by key either stream, and to window the first stream using a sliding window, but from there out things get complicated. You can join the two streams by key, but you must do so using the same window for both streams. That means events from the first stream may be matched to older events of the second stream, which is not what we want. I suppose if both included a timestamp, you could later add a filter to remove such events from the merged stream. But you would also have to deal with duplicates, as the window is a sliding window and the same two elements may match across all window panes that contain the matching elements. So you need to dedup as well. coGroup seems like it would suffer from the same issues. Maybe the answer is connected streams, but there is scant documentation on the semantics of ConnectedStreams. There isn't even an example that I could find that makes use of them. Thoughts? |
You could simulate the Samza approach by having a RichFlatMapFunction over cogrouped streams that maintains the sliding window in its ListState. As I understand the drawback is that the list state is not maintained in the managed memory. I'm interested to hear about the right way to implement this. On Wed, Apr 13, 2016 at 3:53 PM, Elias Levy <[hidden email]> wrote:
|
Anyone from Data Artisans have some idea of how to go about this? On Wed, Apr 13, 2016 at 5:32 PM, Maxim <[hidden email]> wrote:
|
Hi Elias, sorry for the late reply. You're right that with the windowed join you would have to deal with pairs where the timestamp of (x,y) is not necessarily earlier than the timestamp of z. Moreover, by using sliding windows you would receive duplicates as you've described. Using tumbling windows would mean that you lose join matches if (x,y) lives in an earlier window. Thus, in order to solve your problem you would have to write a custom stream operator. The stream operator would do the following: Collecting the inputs from (x,y) and z which are already keyed. Thus, we know that x=z holds true. Using a priority queue we order the elements because we don't know how the arrive at the operator. Whenever we receive a watermark indicating that no earlier events can arrive anymore, we can go through the two priority queues to join the elements. The queues are part of the operators state so that we don't lose information in case of a recovery. I've sketched such an operator here [1]. I hope this helps you to get started. Cheers, Till On Thu, Apr 14, 2016 at 5:12 PM, Elias Levy <[hidden email]> wrote:
|
Thanks for the suggestion. I ended up implementing it a different way. What is needed is a mechanism to give each stream a different window assigner, and then let Flink perform the join normally given the assigned windows. Specifically, for my use case what I need is a sliding window for one stream and a trailing window for the other stream. A trailing window is just a TimeWindow where the window end time is the event time, rounded up or down some amount, and the window start time is is end time minus some given parameter. For instance: class TrailingEventTimeWindows(asize: Long, around: Long) extends WindowAssigner[Object, TimeWindow] { val size = asize val round = around override def assignWindows(element: Object, timestamp: Long): Collection[TimeWindow] = { if (timestamp > java.lang.Long.MIN_VALUE) { val end = (timestamp - (timestamp % round)) + round Collections.singletonList(new TimeWindow(end - size, end)) } else { // Long.MIN_VALUE is currently assigned when no timestamp is present throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?") } } def getSize: Long = size override def getDefaultTrigger(env: JStreamExecutionEnvironment): Trigger[Object, TimeWindow] = EventTimeTrigger.create() override def toString: String = s"TrailingEventTimeWindows($size)" override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = new TimeWindow.Serializer() } object TrailingEventTimeWindows { def of(size: Time, round: Time) = new TrailingEventTimeWindows(size.toMilliseconds(), round.toMilliseconds()) } If the Flink API where different, then I could do something like this to join the streams: val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // (time, key, id) val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100, 10), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30), (7000, 100, 40) ) // (time, file) val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300), (4000, 100) ) val windowedKeyedChanges = changes .assignAscendingTimestamps( _._1 ) .keyBy(1) .window(TrailingEventTimeWindows.of(Time.seconds(5),Time.seconds(1))) val windowedKeyedEvents = events.assignAscendingTimestamps( _._1 ) .keyBy(2) .timeWindow(Time.seconds(5), Time.seconds(1)) val results = windowedKeyedEvents.join(windowedKeyedChanges) .apply { } Alas, the Flink API makes this more complicated. Instead of allowing you to joined to keyed windowed streams, you join two unkeyed unwind owed streams and tell it how to key them and join them using join().where().equalTo().window(). Since that construct only takes a single WindowAssigner I created a window assigner that uses a different assigner for each stream being joined: class DualWindowAssigner[T1 <: Object, T2 <: Object](assigner1: WindowAssigner[Object, TimeWindow], assigner2: WindowAssigner[Object, TimeWindow]) extends WindowAssigner[Object, TimeWindow] { val windowAssigner1 = assigner1 val windowAssigner2 = assigner2 override def assignWindows(element: Object, timestamp: Long): Collection[TimeWindow] = { val e = element.asInstanceOf[TaggedUnion[T1,T2]] if (e.isOne) { windowAssigner1.assignWindows(e.getOne, timestamp) } else { windowAssigner2.assignWindows(e.getTwo, timestamp) } } override def getDefaultTrigger(env: JStreamExecutionEnvironment): Trigger[Object, TimeWindow] = EventTimeTrigger.create() override def toString: String = s"DualWindowAssigner" override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = new TimeWindow.Serializer() } Then I can do: val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // (time, key, id) val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100, 10), (1500, 300, 20), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30), (7000, 100, 40) ) // (time, key) val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300), (4000, 100) ) val eventsWithTime = events.assignAscendingTimestamps( _._1 ) val changesWithTime = changes.assignAscendingTimestamps( _._1 ) val results = eventsWithTime.join(changesWithTime) .where( _._2 ).equalTo( _._2 ) .window(new DualWindowAssigner[Tuple3[Int,Int,Int],Tuple2[Int,Int]]( SlidingEventTimeWindows.of( Time.seconds(4), Time.seconds(1)), TrailingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)) )) .apply { (x1, x2) => (x1, x2) } results.print() This works as Flink will consider two TimeWindows the same so long as they have the same start and end time. So as long as the sliding and trailing windows have the same size and the are rounded to correctly, they will match. I think the idea of a trailing window is a powerful one. It would be useful is one where included in the Flink API. Being able to join streams with different window assigners is also useful as evidenced by my use case. Maybe some thought should be given on how to support that use case officially. Thoughts? Comments? On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <[hidden email]> wrote:
|
Hi Elias, thanks for the long write-up. It's interesting that it actually kinda works right now. You might be interested in a design doc that we're currently working on. I posted it on the dev list but here it is: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit?usp=sharing I'm trying to add support for side inputs. They are excellent for the use case where you want to enrich (join) a main stream with one or several other streams. This would also include support for different windows on the different streams and a mechanism for mapping main windows to the correct side-input windows. Feedback/suggestions are very welcome on this! Cheers, Aljoscha On Tue, 3 May 2016 at 03:06 Elias Levy <[hidden email]> wrote:
|
In reply to this post by Till Rohrmann
Till, Thanks again for putting this together. It is certainly along the lines of what I want to accomplish, but I see some problem with it. In your code you use a ValueStore to store the priority queue. If you are expecting to store a lot of values in the queue, then you are likely to be using RocksDB as the state backend. But if you use a ValueStore for the priority queue with RocksDB as the backend, the whole priority queue will be deserialized and serialized each time you add an item to it. That will become a crushing cost as the queue grows. I could instead use a ListState with the RocksDB state, that way only the single value being added is serialized on an add. But the get() operation in the RocksDBListState seems very inefficient, defeating the idea of working with data sets that don't fit in memory. It loads all values into a List instead of returning an Iterable that returns values in the list by iterating via the RockDB scan API. Samza has the advantage here, as it provides a ordered KV state API that allows you to truly iterate over the values in RocksDB and a caching lager to batch writes into RocksDB. I am surprised there is no OrderedKeyValueStore API Flink. Given that only the RocksDB backend can store more state that can fit in memory and the cost associated with its get() method when keeping track of a list, it seems like there isn't a good solution keep track of large state in the form of a list or ordered list in Flink right now. On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <[hidden email]> wrote:
|
Hi Elias, Samza and Flink operate at slightly different abstraction layers here. The Samza KeyValueStore basically has the interface of a Java HashMap. When accessing keyed state the key is always explicit in the method access and it only allows a simple put and get per key. Flink State, such as ValueState, ListState and ReducingState is implicitly scoped to the key of the input element and it allows different "shapes" of state, for example the ListState uses the efficient merge operation to add to the state instead of a get-update-put cycle that would be required in Samza KeyValueStore. In code, this is what Samza does: class Operator { KeyValueStore<K, V> store = ... void process(KV<Key, Value> element) { value = store.get(element.getKey()) ... store.put(element.getKey(), ...) } } while this is what Flink does: class Operator { ValueState<V> state = ... void internalProcess(KV<Key, Value> element) { state.setKey(element.getKey()) process(element) } void process(KV<Key, Value> element) { value = state.get() ... state.update(...) } } In Flink we are dealing with the keys internally, which makes it easier for us to implement things like automatic scaling with rebalancing of keyed state (Till is working on this right now). Underneath we have something similar to the KeyValueStore, if you want, you could write a custom operator that deals with these details directly and and handles managing of keys. The thing we don't support right now is iterating over all keys/state for the locally held keys. I'm changing this, however, in this PR: https://github.com/apache/flink/pull/1957. Then you can do everything that you can do with the Samza KeyValueStore plus a bit more because we have more specific types of state that exploit features such as merge instead of put-update-get.I hope this clarifies things a bit. :-) Cheers, Aljoscha On Wed, 4 May 2016 at 00:28 Elias Levy <[hidden email]> wrote:
|
In reply to this post by Elias Levy
Till,
An issue with your suggestion is that the job state may grow unbounded. You are managing expiration of data from the cache in the operator, but the state is partitioned by the stream key. That means if we no longer observe a key, the state associated with that key will never be removed. In my data set keys come and go, and many will never be observed again. That will lead to continuous state growth over time. On Mon, May 2, 2016 at 6:06 PM, Elias Levy <[hidden email]> wrote:
|
Hi Elias!
I think you brought up a couple of good issues. Let me try and summarize what we have so far: 1) Joining in a more flexible fashion => The problem you are solving with the trailing / sliding window combination: Is the right way to phrase the join problem "join records where key is equal and timestamps are within X seconds (millis/minutes/...) of each other"? => That should definitely have an API abstraction. The first version could me implemented exactly with a combination of sliding and trailing windows. => For joins between windowed and non windowed streams in the long run: Aljoscha posted the Design Doc on side inputs. Would that cover the use case as a long-term solution? 2) Lists that are larger than the memory => The ListState returns an Iterable, but it is eagerly materialized from RocksDB. Is there a way to "stream" the bytes from RocksDB? Flink could then deserialize them in a streamed fashion as well. 3) Can you elaborate a bit on the OrderedListState? Do you think of multiple values (ordered) per key, or a sequence of key/value pairs, ordered by key? => Currently Flink limits the scope of key accesses to the values current key (as defined in the keyBy() function). That way, the system can transparently redistribute keys when changing the parallelism. Greetings, Stephan On Sat, May 21, 2016 at 12:24 AM, Elias Levy <[hidden email]> wrote:
|
Hi Elias, I like the idea of having a trailing / sliding window assigner to perform your join. However, the result should not be entirely correct wrt your initial join specification. Given an events data set which contains the elements e1 = (4000, 1, 1) and e2 = (4500, 2, 2) and a changes data set which contains the element c1 = (4000, 1, 1). With the trailing and sliding window assigner of SlidingEventTimeWindows(4000, 1000) and TrailingEventTimeWindows(4000, 1000), c1 would be assigned to TrailingWindow(1000, 5000). e1 and e2 would both be amongst others in SlidingWindow(1000, 5000). Thus, the two windows would be joined. The result would be (e1, c1) and (e2, c1). However, e2 happened after c1. But if that is ok for your use case, then your solution is great :-) Cheers, Till On Wed, May 25, 2016 at 2:04 PM, Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |