Hi Fabian/Stephan, Waiting for your suggestion Regards, Vinay Patil On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]> wrote:
|
Hi Vinay, can you give a bit more detail about how you plan to implement the outer join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?However, you do not need to directly interact with RocksDB. Flink is taking care of that for you. 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]>:
|
Hi Fabian, I had already used Co-Group function earlier but were getting some issues while dealing with watermarks (for one use case I was not getting the correct result), so I have used the union operator for performing the outer-join (WindowFunction on a keyedStream), this approach is working correctly and giving me correct results. As I have discussed the scenario, I want to maintain the non-matching records in some store, so that's why I was thinking of using RocksDB as a store here, where I will maintain the user-defined state after the outer-join window operator, and I can query it using Flink to check if the value for a particular key is present or not , if present I can match them and send it downstream. The final goal is to have zero non-matching records, so this is the backup plan to handle edge-case scenarios. I have already integrated code to write to Cassandra using Flink Connector, but I think this will be a better option rather than hitting the query to external store since RocksDb will store the data to local TM disk, the retrieval will be faster here than Cassandra , right ? What do you think ? Regards, Vinay Patil On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
Thanks for the explanation. I think I understood your usecase. Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed stream (keyed by join key). One input would be the unmatched outer join records, the other input would serve the events you want to match them with. The future state expiry feature will avoid such situations. 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]>:
|
Yes, that's what I am looking for. But why to use CoFlatMapFunction , I have already got the matchingAndNonMatching Stream , by doing the union of two streams and having the logic in apply method for performing outer-join. I am thinking of applying the same key on matchingAndNonMatching and flatmap to take care of rest logic. Or are you suggestion to use Co-FlatMapFunction after the outer-join operation (I mean after doing the window and getting matchingAndNonMatching stream )? Regards, Vinay Patil On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
I thought you would like to join the non-matched elements with another (third) stream. --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.connect(s3.keyBy).coFlatMap(// backup join) If you want to match the non-matched stream with itself a FlatMapFunction is the right choice. --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(// backup join) The backup join puts all non-match elements in the state and waits for another non-matched element with the same key to do the join. 2016-09-01 19:55 GMT+02:00 vinay patil <[hidden email]>:
|
I don't to join the third stream. And Yes, This is what I was thinking of.also : s1.union(s2).keyBy().window(). I am already done integrating with Cassandra but I feel RocksDB will be a better option, I will have to take care of the clearing part as you have suggested, will check that in documentation. I have the DTO with almost 50 fields , converting it to JSON and storing it as a state should not be a problem , or there is no harm in storing the DTO ? I think the documentation should specify the point that the state will be maintained for user-defined operators to avoid confusion. Regards, Vinay Patil On Thu, Sep 1, 2016 at 1:12 PM, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
You do not have to convert your DTO into a JSON object to use it as a key-value state in a Flink function. Can you point me to the documentation that you find confusing? The state documentation [1] says:You can pass it as it is via the state interfaces. >> You can make every transformation ( map , filter , etc) stateful
by using Flink’s state interface or checkpointing instance fields of your function. >> You can register any instance field as managed state by implementing an interface. >> In this case, and also in the case of using Flink’s native state interface, Flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html 2016-09-01 20:22 GMT+02:00 vinay patil <[hidden email]>:
|
Hi Fabian, I am referring to this, this does not clearly state if the state will be maintained in local disk even after checkpointing. Or I am not getting it correclty :) Regards, Vinay Patil On Thu, Sep 1, 2016 at 1:38 PM, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
Thanks for the pointer and sorry for the late answer. I guess that depends on the semantics of "checkpointing". In Flink's terminology this means creating a copy of the state (and writing the copy to the external FS). It does not mean that the state is migrated or moved to the external FS.2016-09-01 20:53 GMT+02:00 vinay patil <[hidden email]>:
|
Free forum by Nabble | Edit this page |