val datafile = env.readCsvFile[(String, String, String)](option_datafile, fieldDelimiter=";",quoteCharacter = '|').filter(_._3.toInt >= implicit_ratings_from) val ordered_user_timestamp = datafile.sortPartition(0,Order.ASCENDING).groupBy(0).reduceGroup(new GroupReduceFunction[(String, String, String), List[CoConsumed]] { override def reduce(values: Iterable[(String, String, String)], out: Collector[List[CoConsumed]]): Unit = { val sorted_values = values.asScala.toList out.collect(sorted_values.sliding(2).filter(_.size >1 ).map(x => new CoConsumed(x.head._2, x.tail.head._2)).toList) } }).flatMap{x=>x.map{y=>y}} val support_j = ordered_user_timestamp.map{x=>(x.j,1.0)}.groupBy(0).sum(1).filter(_._2 > filter_pairs_frequencies) val grouped_items = ordered_user_timestamp.join(support_j).where(_.j).equalTo(_._1).map{x=>(x._1,1.0)}.groupBy(0).sum(1).collect().toMap[CoConsumed,Double]