Hi, I am pretty new to flink and I don't know what will be the best way to deal with the following use case:
(A "natural" candidate for the key on this stream will be the resource id) The issue I have is that regarding the query to the external system:
I am thinking that a way to deal with that will be:
But after looking at the documentation, I cannot see this ability (having a windowAll on a keyed stream). Am I missing something? What will be the best way to deal with such a use case?
I've tried for example to review my key and to do something like "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a time window. In my example above, the <max nb of queries in parallel> will be 4. And all my keys will be 0, 1, 2 or 3. The issue with this approach is that due to the way the operatorIdx is computed based on the key, it does not distribute well my processing:
So, what will be the best way to deal with that?
Thank you in advance for your support. Regards.
Julien.
|
Hi Julien,
sorry for my misunderstanding before. For now, the window can only be defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like to provide three options for your scenario. 1. If your external data is static and can be fit into the memory, you can use ManagedStates to cache them without considering the querying problem. 2. Or you can use a CustomPartitioner to manually distribute your alert data and simulate an window operation by yourself in a ProcessFuncton. 3. You may also choose to use some external systems such as in-memory store, which can work as a cache for your queries. Best, Xingcan
|
Hi Julien, If I am not misunderstand, I think you can key your stream on a `Random.nextInt() % parallesm`, this way you can "group" together alerts from different and benefit from multi parallems.
发自网易邮箱大师
On 02/19/2018 09:08,[hidden email] wrote:
Hi Julien, |
Hello,
I've already tried to key my stream with "resourceId.hashCode%parallelism" (with parallelism of 4 in my example). So all my keys will be either 0,1, 2 or 3. I can then benefit from a time window on this keyed stream and do only 4 queries to my external system. But it is not well distributed with the default partitioner on keyed stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3). I think I should explore the customer partitioner, as you suggested Xingcan. Maybe my last question on this will be: "can you give me more details on this point "and simulate a window operation by yourself in a ProcessFunction" ? When I look at the documentation about the custom partitioner, I can see that the result of partitionCustom is a DataStream. It is not a KeyedStream. So the only window I have will be windowAll (which will bring me back to a parallelism of 1, no ?). And if I do something like "myStream.partitionCustom(<my new partitioner>,<my key>).keyBy(<myKey>).window(...)", will it preserve my custom partitioner ? When looking at the "KeyedStream" class, it seems that it will go back to the "KeyGroupStreamPartitioner" and forget my custom partitioner ? Thanks again for your feedback, Julien. On 19/02/2018 03:45, 周思华 wrote:
|
Hi Julien,
I'd run into a similar situation, where I need to have a keyed stream, but I want (effectively) one key per task. It’s possible to generate keys that will get distributed as you need, though it does require making assumptions about how Flink generates hashes/key groups. And once you start talking about state, then it gets a bit harder, as you need to know the max parallelism, which is used to calculate “key groups”. Below is a cheesy function I wrote to make an Integer that (if used as the key) will partition the record to the target operator. I use it in a custom Map function to add a key field. — Ken /** * Return an integer value that will get partitioned to the target <operatorIndex>, given the * workflow's <maxParallelism> (for key groups) and the operator <parallelism>. * * @param maxParallelism * @param parallelism * @param operatorIndex * @return Integer suitable for use in a record as the key. */ public static Integer makeKeyForOperatorIndex(int maxParallelism, int parallelism, int operatorIndex) { if (maxParallelism == ExecutionJobVertex.VALUE_NOT_SET) { maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism); } for (int i = 0; i < maxParallelism * 2; i++) { Integer key = new Integer(i); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism); int index = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroup); if (index == operatorIndex) { return key; } } throw new RuntimeException(String.format("Unable to find key for target operator index %d (max parallelism = %d, parallelism = %d", operatorIndex, maxParallelism, parallelism)); }
-------------------------- Ken Krugler custom big data solutions & training Hadoop, Cascading, Cassandra & Solr |
Hi Julien, at the moment Flink only supports parallel windows which are keyed. What you would need is something like a per-partition window which is currently not supported. The problem with that is that it is not clear how to rescale a per-partition window because it effectively means that you have only as many key groups as you have partitions. What you can also do is to key by a prefix of your resource id. That way you will group more resource ids into the same window. Choosing a prefix which gives you enough groups to evenly utilize your workers as well as higher granularity for your external requests should then be doable. Ken's solution should work for your use case. However, be aware that this will break as soon as Flink changes its internal key to key-group mapping. Cheers, Till On Mon, Feb 19, 2018 at 5:27 PM, Ken Krugler <[hidden email]> wrote:
|
In reply to this post by Julien-2
Hi Julien,
you could use the OperatorState to cache the data in a window and the last time your window fired. Then you check the ctx.timerService().currentProcessingTime() in processElement() and once it exceeds the next window boundary, all the cached data should be processed as if the window is fired. Note that currently, there are only memory-based operator states provided. Hope this helps, Xingcan On 19 Feb 2018, at 4:34 PM, Julien <[hidden email]> wrote: |
Hi Xingcan, Ken and Till,
OK, thank you. It is clear. I have various option then:
Regards. Julien. On 20/02/2018 02:48, Xingcan Cui wrote: Hi Julien,
|
Free forum by Nabble | Edit this page |