| Hi,       the follow code:     val text = env.socketTextStream(hostName, port)     val words1 = text.map { x =>       val res = x.split(",")       (res.apply(0)->res.apply(1))     }     val words2 = env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4"))     val joinedStream = words1       .coGroup(words2)       .where(_._1)       .equalTo(_._1)       .window(GlobalWindows.create())       .trigger(CountTrigger.of(1))       val res = joinedStream.apply(new InnerJoinFunction).print()     env.execute() class InnerJoinFunction extends CoGroupFunction[(String,String),(String,String),(String,String)]{     override def coGroup(T1: java.lang.Iterable[(String,String)],          T2: java.lang.Iterable[(String,String)],          out: Collector[(String, String)]): Unit = {         println("****************************")         println("T1="+T1+"T2="+T2)       import scala.collection.JavaConverters._       val scalaT2 = T2.asScala.toList       if(!T1.asScala.isEmpty && scalaT2.nonEmpty){           val transaction = T1.asScala.last            println("T2 last="+transaction)           for(snapshot <- scalaT2){             out.collect(transaction._1,transaction._2+snapshot._2)           }       }     }   } -------------------------------- the code have no proplem,and can run,the follow is the result:(input "a,1" then input "a,2") **************************** T1=[(a,1)]T2=[(a,w2), (a,w1)] T2 last=(a,1) 2> (a,1w2) 2> (a,1w1) **************************** T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)] T2 last=(a,2) 2> (a,2w2) 2> (a,2w1) -------------------------------------------------- the T1 is increase,and T2 is not change.i worry,when  input so many,the T1 will out of storage. so i want to write my "GlobalWindows.create()", to achieve T1 will store the only one,from input(or read from kafka),and the history of T1 will clear(input a,1 T1 is [(a,1)],then input a,2,T1 is [(a,2)],not T1=[(a,1), (a,2)]),but T2 will not change. i rewrite the "GlobalWindows",but it do not work,i read the code,find must rewrite the "GlobalWindow",and must modify "the class Serializer extends TypeSerializer<MyGlobalWindow>",but when i run,it can not into there,why? some can tell me? | 
 
	
					
		
	
					| Hi, will words2 always remain constant? If yes, you don't have to create a stream out of it and coGroup it, but you could simply pass the collection to Map/FlatMap function and do the joining there without the need of a window. Btw. you know that non-keyed global windows do not scale? If I understand your code correctly, you just want to get a stream with the last T2, right? I don't think you have to implement your own "GlobalWindow" for that. Have you tried to use Flink's operator state for that? So that if the state is growing it can be written to disk. Hope that helps. Timo Am 06/09/16 um 10:05 schrieb [hidden email]: 
 
 -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr | 
| Free forum by Nabble | Edit this page | 
 
	

 
	
	
		
