| Hi, I'm working on a project which uses Flink to compute hourly log statistics like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and packed into a DataStream. The problem is, I find the computation quite challenging to express with Flink's DataStream API: 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that the data volume is really high, e.g., billions of logs might be generated in one hour, will the window grow too large and can't be handled efficiently? 2. We have to create a `KeyedStream` before applying `timeWindow`. However, the distribution of some keys are skewed hence using them may compromise the performance due to unbalanced partition loads. (What I want is just rebalance the stream across all partitions.) 3. The top-K algorithm can be straightforwardly implemented with `DataSet`'s `mapPartition` and `reduceGroup` API as in [FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so easy if taking the DataStream approach, even with the stateful operators. I still cannot figure out how to reunion streams once they are partitioned. 4. Is it possible to convert a DataStream into a DataSet? If yes, how can I make Flink analyze the data incrementally rather than aggregating the logs for one hour before starting to process? | 
| My algorithm is roughly like this taking top-K words problem as an example  
 Here is some (probably buggy) code to demonstrate the basic idea on DataSet:  | 
 
	
					
		
	
					| 
				In reply to this post by Yukun Guo
			 Suggestions in-line below... On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <[hidden email]> wrote:There is no direct way to turn a DataStream into a DataSet.  I addressed the point about doing the computation incrementally above, though.  You do this with a ReduceFunction.  But again, there doesn't exist an exact incremental TopK algorithm that I'm aware of.  This can be done with sketching, though. 
 In the general case you can use:     stream         .timeWindow(...)         .apply(reduceFunction, windowFunction) which can take a ReduceFunction and a WindowFunction.  The ReduceFunction is used to reduce the state on the fly and thereby keep the total state size low.  This can commonly be used in analytics applications to reduce the state size that you're accumulating for each window.  In the specific case of TopK, however, you cannot do this if you want an exact result.  To get an exact result I believe you have to actually keep around all of the data and then calculate TopK at the end in your WindowFunction.  If you are able to use approximate algorithms for your use case than you can calculate a probabilistic incremental TopK based on some sort of sketch-based algorithm. 
 A good and simple way to approach this may be to come up with a composite key for your data that *is* uniformly distributed.  You can imagine something simple like 'natural_key:random_number'.  Then keyBy(natural_key) and reduce() again.  For example:     stream         .keyBy(key, rand())      // partition by composite key that is uniformly distributed         .timeWindow(1 hour)         .reduce()                     // pre-aggregation         .keyBy(key)                // repartition         .timeWindow(1 hour)         .reduce()                     // final aggregation 
     I'm not sure I know what you're trying to do here.  What do you mean by re-union? 
 | 
| Thank you very much for the detailed answer. Now I understand a DataStream can be repartitioned or “joined” (don’t know the exact terminology) with keyBy. But another question:  I’m also not sure whether a TreeMap is suitable here. This StackOverflow question presents a similar approach: http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data, but the suggested solution seems rather complicated. On 8 June 2016 at 08:04, Jamie Grier <[hidden email]> wrote: 
 | 
 
	
					
		
	
					| Hi, There are some implementations to do that with low memory footprint. Have a look at the count min sketch for example. There are some Java implementations. Christophe 2016-06-09 15:29 GMT+02:00 Yukun Guo <[hidden email]>: 
 | 
 
	
					
		
	
					| 
				In reply to this post by Yukun Guo
			 
		You should have a look at this project : https://github.com/addthis/stream-lib You can use it within Flink, storing intermediate values in a local state. 
 | 
| Free forum by Nabble | Edit this page | 
 
	

 
	
	
		
