Hi,
I have a case of meta data enrichment and im wondering if my approach is the correct way .
I need to extract a key from the kafka stream and use it to select some values from the sql table . SO i thought to use the table SQL api in order to select the table MD then convert the kafka stream to table and join the data by the stream key . At the end i need to map the joined data to a new POJO and send it to elesticserch . Any suggestions or different ways to solve this use case ? thanks, Miki |
If the SQL data is all (or mostly all) needed to join against the data from Kafka, then I might try a regular join.
Otherwise it sounds like you want to use an AsyncFunction to do ad hoc queries (in parallel) against your SQL DB. — Ken
-------------------------- Ken Krugler custom big data solutions & training Hadoop, Cascading, Cassandra & Solr |
HI thanks for the reply i will try to break your reply to the flow execution order . First data stream Will use AsyncIO and select the table , Second stream will be kafka and the i can join the stream and map it ? If that the case then i will select the table only once on load ? How can i make sure that my stream table is "fresh" . Im thinking to myself , is thire a way to use flink backend (ROKSDB) and create read/write through macanisem ? Thanks miki On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler <[hidden email]> wrote:
|
Hi Miki,
I haven’t tried mixing AsyncFunctions with SQL queries. Normally I’d create a regular DataStream workflow that first reads from Kafka, then has an AsyncFunction to read from the SQL database. If there are often duplicate keys in the Kafka-based stream, you could keyBy(key) before the AsyncFunction, and then cache the result of the SQL query. — Ken
|
Hi Miki, Sorry for the late response.Note that Flink's SQL support does not add advantages for the either of both approaches. You should use the DataStream API (and possible ProcessFunctions). Let me know if you need to tackle the second approach and I can give some details on the workarounds I mentioned. 2018-04-16 20:38 GMT+02:00 Ken Krugler <[hidden email]>:
|
Hi Fabian, please share the workarounds, that must be helpful for my case as well Thank you, Alex On Mon, Apr 23, 2018 at 2:14 PM Fabian Hueske <[hidden email]> wrote:
|
Hi Alex, An operator that has to join two input streams obviously requires two inputs. In case of an enrichment join, the operator should first read the meta-data stream and build up a data structure as state against which the other input is joined. If the meta data is (infrequently) updated, these updates should be integrated into the state. The problem is that it is currently not possible to implement such an operator with Flink because operators cannot decide from which input to read, i.e., they have to process whatever data is given to them. Hence, it is not possible to build up a data structure from the meta data stream before consuming the other stream. There are a few workarounds that work in special cases.2018-04-23 13:29 GMT+02:00 Alexander Smirnov <[hidden email]>:
|
Hi Fabian,
Specifically, for a connected stream feeding a Co(Flat)MapFunction, it seems like we could let Flink know how to pick elements from the two network buffers - e.g. random, round robin, or by timestamp. I don’t know how this works with chained operators, but it does seem a bit odd to have operators create buffers of elements when (network) buffers often already exist. If there’s no network buffers in play (e.g. there’s a direct chain of operators from a source) then it could be something that’s not supported, though with the future source-pull architecture that would also be easy to resolve. Anyway, I could take a whack at this if it seems reasonable. — Ken
|
Using a flat map function, you can always buffer the non-meta data stream in the operator state until the metadata is aggregated, and then process any collected data. It would require a RichFlatMap to hold data.
Michael
|
In reply to this post by Ken Krugler
I agree in the general case you need to operate on the stream data based on the metadata you have. The side input feature coming some day may help you, in that it would give you a means to receive inputs out of band. But, given changing metadata and changing stream data I am not sure this is any different from dual stream data inputs. Either you use windowing to do small batches of data to allow coordination of stream and metadata, or you use the metadata you have collected to date on receipt of the stream data. Given flink will do record by record processing you have the option of controlling the timing as needed for your use case.
Michael
|
Hi Michael,
Windowing works when you’re joining timestamped metadata and non-metadata. The common case I’m referring to is where there’s some function state (e.g. rules to process data, machine learning models, or in my case clusters), where you want to process the non-metadata with the "current state”. In that case, blindly applying whatever metadata has been collected to incoming non-metadata often doesn’t work well. That’s why Fabian was suggesting various approaches (below) to work around the problem. The general solution (his option #2, with buffering) will work, but can lead to OOME and feels like it breaks the basic Flink back-pressure mechanism, due to in-operator buffering. If it was possible to essentially allow Flink to block (or not pull, for sources) from the non-metadata stream when appropriate, then no buffering would be needed. Then it would be straightforward to do things like… - drain all metadata from a Kafka topic before applying that to the other stream. - defer processing data from the other stream if there was newer metadata. As an aside, what I’m seeing with Flink 1.5 and using a connected keyed & broadcast stream is that the CoFlatMapFunction seems to be giving priority to data going to the flatMap1() method, though this could be an odd side effect of how iterations impact the two streams. — Ken
|
Hi all, @Ken, the approach of telling the operator which input to read from would cause problems with the current checkpointing mechanism because checkpoint barriers are not allowed to overtake regular records. Chaining wouldn't be an issue, because operators with two inputs are not chained to their predecessors.2018-04-25 23:36 GMT+02:00 Ken Krugler <[hidden email]>:
|
HI guys , This is how i tried to solve my enrichment case https://gist.github.com/miko-code/d615aa05b65579f4366ba9fe8a8275fd Currently we need to use keyby() before the process function. My concern is if i have in flight N messages with the same key the process function will execute once or N times ? Thanks, MIki On Thu, Apr 26, 2018 at 1:24 PM Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |