Hi Team,
I'm working on a Flink Streaming application. The data is injected through Kafka connectors. The payload volume is roughly 100K/sec. The event payload is a string. Let's call this as DataStream1. This application also uses another DataStream, call it DataStream2, (consumes events off a kafka topic). The elements of this DataStream2 involves in a certain transformation that finally updates a Hashmap(/Java util Collection). Apparently the flink application should share this HashMap across the flink cluster so that DataStream1 application could check the state of the values in this collection. Is there a way to do this in Flink? I don't see any Shared Collection used within the cluster? Best Regards CVP |
Hi Team, Can someone help me here? Appreciate any response !On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <[hidden email]> wrote:
|
Hi, Flink does not provide shared state. However, you can broadcast a stream to CoFlatMapFunction, such that each operator has its own local copy of the state. If that does not work for you because the state is too large and if it is possible to partition the state (and both streams), you can also use keyBy instead of broadcast. Finally, you can use an external system like a KeyValue Store or In-Memory store like Apache Ignite to hold your distributed collection. Best, Fabian 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <[hidden email]>:
|
Hi Fabian, Thanks for your response. Apparently these DataStream (Job1-DataStream1 & Job2-DataStream2) are from different flink applications running within the same cluster. On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <[hidden email]> wrote:
|
Is writing DataStream2 to a Kafka topic and reading it from the other job an option? 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <[hidden email]>:
|
certainly, what I thought as well... The output of DataStream2 could be in 1000s and there are state updates...However, assuming that we maintain this state into a collection, and updating the state (by reading from the topic) in this collection, will this be replicated across the cluster within this job1 ? On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <[hidden email]> wrote:
|
Operator state is always local in Flink. However, with key-value state, you can have something which behaves kind of similar to a distribute hashmap, because each operator holds a different shard/partition of the hashtable. If you have to do only a single key lookup for each element of DS1, you should think about partitioning both streams (keyBy) and writing the state into Flink's key-value state [1]. This will have several benefits: 2) Depending on the backend (RocksDB) [2], parts of the state can reside on disk. You are not bound to the memory of the JVM. 3) Flink takes care of the look-up. No need to have your own hashmap.4) It will only be possible to rescale jobs with key-value state (this feature is currently under development). Best, Fabian 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <[hidden email]>:
|
I'm understanding this better with your explanation.. With this use case, each element in DS1 has to look up against a 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no., of keys.... will the key-value shard work in this case?On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <[hidden email]> wrote:
|
That depends. 1) Growing/Shrinking: This should work. New entries can always be inserted. In order to remove entries from the k-v-state you have to set the value to null. Note that you need an explicit delete-value record to trigger the eviction.2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <[hidden email]>:
|
In reply to this post by Fabian Hueske-2
Hi Fabian
I am also looking for this solution, could you help me with two things: 1. How this is different from Queryable state. 2. How to query this key-value state from DS2 even if its running in the same application. e.g. val keyedStream = stream.keyby(_.key) val otherStream = somekafka.createStream The final goal is to have something like: otherStream.foreach(kafkamessage => keyedStream.lookup(kafkamessage.key)) ~Pushpendra Jaiswal |
Hi Pushpendra, 1. Queryable state is an upcoming feature and not part of an official release yet. With queryable state you can query operator state from outside the application.2016-09-08 8:54 GMT+02:00 pushpendra.jaiswal <[hidden email]>: Hi Fabian |
In reply to this post by Fabian Hueske-2
Hi Fabian, First of all thanks for all your prompt responses. With regards to 2) Multiple looks ups, I have to clarify what I mean by that... DS1<String> elementKeyStream = stream1.map(String<>); this maps each of the streaming elements into string mapped value... On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <[hidden email]> wrote:
|
Not sure if I got your requirements right, but would this work? KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v pairs).keyBy(0);KeyedStream<String, String> ks1 = ds1.keyBy("*") ; 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <[hidden email]>:
|
Hi Fabian, I'm coding to check if your proposal works and hit with an issue with ClassCastException. // Here is my Value that has state information.....an implementation of my value state... where the key is a Double value... on connected stream ks2 public class BlockedEventState implements ValueState<BlockedRoadInfo> { public BlockedRoadInfo blockedRoad; @Override public void clear() { blockedRoad = null; } @Override public BlockedRoadInfo value() throws IOException { return blockedRoad; } @Override public void update(final BlockedRoadInfo value) throws IOException { blockedRoad = value; } } //BlockedRoadInfo class... public class BlockedRoadInfo { long maxLink; long minLink; double blockedEventId; ....setters & ... getters } /// new RichCoFlatMapFunction() { private transient BlockedEventState blockedRoads; ............ @Override public void open(final org.apache.flink.configuration.Configuration parameters) throws Exception { final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", TypeInformation.of(BlockedRoadInfo.class), null); blockedRoads = (BlockedEventState) getRuntimeContext().getState(blockedStateDesc); // FAILS HERE WITH CLASSCAST }; } Caused by: java.lang.ClassCastException: org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to com.ericsson.components.aia.iot.volvo.state.BlockedEventState I have tried to set the state backend to both MemState and FsState... streamEnv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints")); On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi, you don't need the BlockedEventState class, you should be able to just do this: private transient ValueState<BlockedRoadInfo> blockedRoads; ............ @Override public void open(final org.apache.flink.configuration.Configuration parameters) throws Exception { final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", TypeInformation.of(BlockedRoadInfo.class), null); blockedRoads = getRuntimeContext().getState(blockedStateDesc); }; } Cheers, Aljoscha On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <[hidden email]> wrote:
|
Hi Aljoscha & Fabian, Finally I got this working. Thanks for your help. In terms persisting the state (for S2), I tried to use checkpoint every 10 Secs using a FsStateBackend... What I notice is that the checkpoint duration is almost 2 minutes for many cases, while for the other cases it varies from 100 ms to 1.5 minutes frequently.KeyedStream<String, String> ks1 = ds1.keyBy("*") ; KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v pairs).keyBy(0); ks1.connect(ks2).flatMap(X); //X is a CoFlatMapFunction that inserts and removes elements from ks2 into a key-value state member. Elements from ks1 are matched against that state. On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek <[hidden email]> wrote:
|
PFA, Flink_checkpoint_time.png in relation to this issue. On Thu, Sep 22, 2016 at 3:38 PM, Chakravarthy varaga <[hidden email]> wrote:
|
Hi Team, Will you be able to guide me on this? Is this a known issue with checkpointing ? CVP On 22 Sep 2016 15:57, "Chakravarthy varaga" <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |