Scala syntax AllWindowFunction ?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Scala syntax AllWindowFunction ?

Bart van Deenen

Hi all

I'm using 1.0, and have all my data nicely bundled in one allWindow, but
I don't understand the syntax in Scala to make on json out of those for
dumping the whole window into Kafka.

My type is:

val stream: AllWindowedStream[(List[String], Long, Int), TimeWindow]

and I want to do

stream.apply ????????

I've tried to convert the Java example from the documentation to Scala,
but I can't get anything meaningful to compile

allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>,
Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});


Help very appreciated!

Greetings


--
  Bart van Deenen
  [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Scala syntax AllWindowFunction ? Found it myself.

Bart van Deenen

    val aggregatedStream = stream.apply( (w:Window, values:
    scala.Iterable[(List[String], Long, Int)], out:
    Collector[Aggregation]) => {
      import scala.collection.JavaConversions._
      val agg = Aggregation( values.toList.map {
        case (pages, _, ct) => (ct, pages)
      })
      out.collect(agg)
    })

Pheew.


--
  Bart van Deenen
  [hidden email]

On Tue, Mar 22, 2016, at 12:40, Bart van Deenen wrote:

>
> Hi all
>
> I'm using 1.0, and have all my data nicely bundled in one allWindow, but
> I don't understand the syntax in Scala to make on json out of those for
> dumping the whole window into Kafka.
>
> My type is:
>
> val stream: AllWindowedStream[(List[String], Long, Int), TimeWindow]
>
> and I want to do
>
> stream.apply ????????
>
> I've tried to convert the Java example from the documentation to Scala,
> but I can't get anything meaningful to compile
>
> allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>,
> Integer, Window>() {
>     public void apply (Window window,
>             Iterable<Tuple2<String, Integer>> values,
>             Collector<Integer> out) throws Exception {
>         int sum = 0;
>         for (value t: values) {
>             sum += t.f1;
>         }
>         out.collect (new Integer(sum));
>     }
> });
>
>
> Help very appreciated!
>
> Greetings
>
>
> --
>   Bart van Deenen
>   [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Scala syntax AllWindowFunction ?

Till Rohrmann
In reply to this post by Bart van Deenen

Hi Bart,

there are multiple ways how to specify a window function using the Scala API. The most scalaesque way would probably be to use an anonymous function:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val input = env.fromElements(1,2,3,4,5,7)

val pair = input.map(x => (x, x))

val allWindow = pair.timeWindowAll(Time.seconds(10))

val result = allWindow{
  (window, iterable, collector: Collector[Int]) =>
    iterable foreach {
      case (l, r) =>
        collector.collect(l + r)
    }
}

result.print()

env.execute("Flink Scala API Skeleton")

I hope this helps you.

Cheers,
Till


On Tue, Mar 22, 2016 at 12:40 PM, Bart van Deenen <[hidden email]> wrote:

Hi all

I'm using 1.0, and have all my data nicely bundled in one allWindow, but
I don't understand the syntax in Scala to make on json out of those for
dumping the whole window into Kafka.

My type is:

val stream: AllWindowedStream[(List[String], Long, Int), TimeWindow]

and I want to do

stream.apply ????????

I've tried to convert the Java example from the documentation to Scala,
but I can't get anything meaningful to compile

allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>,
Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});


Help very appreciated!

Greetings


--
  Bart van Deenen
  [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Scala syntax AllWindowFunction ?

Bart van Deenen
Hi Till
 
yes it does, thanks  for the clear example.
 
Bart
 
 
On Tue, Mar 22, 2016, at 14:25, Till Rohrmann wrote:

Hi Bart,

there are multiple ways how to specify a window function using the Scala API. The most scalaesque way would probably be to use an anonymous function:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val input = env.fromElements(1,2,3,4,5,7)

val pair = input.map(x => (x, x))

val allWindow = pair.timeWindowAll(Time.seconds(10))

val result = allWindow{
  (window, iterable, collector: Collector[Int]) =>
    iterable foreach {
      case (l, r) =>
        collector.collect(l + r)
    }
}

result.print()

env.execute("Flink Scala API Skeleton")

I hope this helps you.

Cheers,
Till

 
 
On Tue, Mar 22, 2016 at 12:40 PM, Bart van Deenen <[hidden email]> wrote:
 
Hi all
 
I'm using 1.0, and have all my data nicely bundled in one allWindow, but
I don't understand the syntax in Scala to make on json out of those for
dumping the whole window into Kafka.
 
My type is:
 
val stream: AllWindowedStream[(List[String], Long, Int), TimeWindow]
 
and I want to do
 
stream.apply ????????
 
I've tried to convert the Java example from the documentation to Scala,
but I can't get anything meaningful to compile
 
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>,
Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});
 
 
Help very appreciated!
 
Greetings


--
  Bart van Deenen
  [hidden email]