How would I use OneInputStreamOperator to deal with data skew?

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

How would I use OneInputStreamOperator to deal with data skew?

Felipe Gutierrez
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Kurt Young
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Felipe Gutierrez
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Felipe Gutierrez
Hi,

what are the artifacts that I have to import on maven in order to use Blink Api?

I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>1.8.0</version>
</dependency>


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Kurt Young
It's because all blink codes are not shipped with 1.8.0, they current only available in 1.9-SNAPSHOT.

Best,
Kurt


On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

what are the artifacts that I have to import on maven in order to use Blink Api?

I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>1.8.0</version>
</dependency>


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Felipe Gutierrez
oh, yes. I just saw. I will use 1.9 then. thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[hidden email]> wrote:
It's because all blink codes are not shipped with 1.8.0, they current only available in 1.9-SNAPSHOT.

Best,
Kurt


On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

what are the artifacts that I have to import on maven in order to use Blink Api?

I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>1.8.0</version>
</dependency>


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Felipe Gutierrez
Hi again Kurt,

could you please help me with the pom.xml file? I have included all Table ecosystem dependencies and the flink-table-runtime-blink as well. However the class org.apache.flink.table.runtime.context.ExecutionContext is still not found. I guess I am missing some dependency, but I do not know which. This is my pom.xml file.

<modelVersion>4.0.0</modelVersion>

<groupId>org.sense.flink</groupId>
<artifactId>explore-flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>explore-flink</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
<scala.binary.version>2.11</scala.binary.version>
<!-- <flink.version>1.8.0</flink.version> -->
<flink.version>1.9-SNAPSHOT</flink.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.15</version>
<!-- <scope>provided</scope> -->
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
<build>
<finalName>explore-flink</finalName>
<plugins>
<!-- download source code in Eclipse, best practice -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.10</version>
<configuration>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>

<!-- Set a compiler level -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>

<!-- Maven Shade Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<!-- Run shade goal on package phase -->
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:*</exclude>
<!-- Also exclude very big transitive dependencies of Flink WARNING: 
You have to remove these excludes if your code relies on other versions of 
these dependencies. -->
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>com.typesafe:config:*</exclude>
<exclude>junit:junit:*</exclude>
<exclude>com.codahale.metrics:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<!-- exclude shaded google but include shaded curator -->
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise, 
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<includes>
<include>org/apache/calcite/**</include>
<include>org/apache/flink/calcite/shaded/**</include>
<include>org/apache/flink/table/**</include>
<include>org.codehaus.commons.compiler.properties</include>
<include>org/codehaus/janino/**</include>
<include>org/codehaus/commons/**</include>
</includes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment 
the following lines. This will add a Main-Class entry to the manifest file -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sense.flink.App</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


Thanks



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <[hidden email]> wrote:
oh, yes. I just saw. I will use 1.9 then. thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[hidden email]> wrote:
It's because all blink codes are not shipped with 1.8.0, they current only available in 1.9-SNAPSHOT.

Best,
Kurt


On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

what are the artifacts that I have to import on maven in order to use Blink Api?

I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>1.8.0</version>
</dependency>


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Kurt Young
I think you can simply copy the source codes to your project if maven dependency can not be used.

Best,
Kurt


On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <[hidden email]> wrote:
Hi again Kurt,

could you please help me with the pom.xml file? I have included all Table ecosystem dependencies and the flink-table-runtime-blink as well. However the class org.apache.flink.table.runtime.context.ExecutionContext is still not found. I guess I am missing some dependency, but I do not know which. This is my pom.xml file.

<modelVersion>4.0.0</modelVersion>

<groupId>org.sense.flink</groupId>
<artifactId>explore-flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>explore-flink</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
<scala.binary.version>2.11</scala.binary.version>
<!-- <flink.version>1.8.0</flink.version> -->
<flink.version>1.9-SNAPSHOT</flink.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.15</version>
<!-- <scope>provided</scope> -->
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
<build>
<finalName>explore-flink</finalName>
<plugins>
<!-- download source code in Eclipse, best practice -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.10</version>
<configuration>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>

<!-- Set a compiler level -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>

<!-- Maven Shade Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<!-- Run shade goal on package phase -->
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:*</exclude>
<!-- Also exclude very big transitive dependencies of Flink WARNING: 
You have to remove these excludes if your code relies on other versions of 
these dependencies. -->
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>com.typesafe:config:*</exclude>
<exclude>junit:junit:*</exclude>
<exclude>com.codahale.metrics:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<!-- exclude shaded google but include shaded curator -->
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise, 
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<includes>
<include>org/apache/calcite/**</include>
<include>org/apache/flink/calcite/shaded/**</include>
<include>org/apache/flink/table/**</include>
<include>org.codehaus.commons.compiler.properties</include>
<include>org/codehaus/janino/**</include>
<include>org/codehaus/commons/**</include>
</includes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment 
the following lines. This will add a Main-Class entry to the manifest file -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sense.flink.App</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


Thanks



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <[hidden email]> wrote:
oh, yes. I just saw. I will use 1.9 then. thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[hidden email]> wrote:
It's because all blink codes are not shipped with 1.8.0, they current only available in 1.9-SNAPSHOT.

Best,
Kurt


On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

what are the artifacts that I have to import on maven in order to use Blink Api?

I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>1.8.0</version>
</dependency>


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Felipe Gutierrez
Hi Kurt,

How do you make the finishBundle method returns the combined tuples? I saw that there is a method "List<String> getOutputs()" which is never called.

I did an implementation based on the example that you suggested. The MapBundleFunctionImpl class has the method finishBundle which iterate all the combined tuples and return it. However, my application does not continue to receive tuples after the transform method.

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <[hidden email]> wrote:
I think you can simply copy the source codes to your project if maven dependency can not be used.

Best,
Kurt


On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <[hidden email]> wrote:
Hi again Kurt,

could you please help me with the pom.xml file? I have included all Table ecosystem dependencies and the flink-table-runtime-blink as well. However the class org.apache.flink.table.runtime.context.ExecutionContext is still not found. I guess I am missing some dependency, but I do not know which. This is my pom.xml file.

<modelVersion>4.0.0</modelVersion>

<groupId>org.sense.flink</groupId>
<artifactId>explore-flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>explore-flink</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
<scala.binary.version>2.11</scala.binary.version>
<!-- <flink.version>1.8.0</flink.version> -->
<flink.version>1.9-SNAPSHOT</flink.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.15</version>
<!-- <scope>provided</scope> -->
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
<build>
<finalName>explore-flink</finalName>
<plugins>
<!-- download source code in Eclipse, best practice -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.10</version>
<configuration>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>

<!-- Set a compiler level -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>

<!-- Maven Shade Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<!-- Run shade goal on package phase -->
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:*</exclude>
<!-- Also exclude very big transitive dependencies of Flink WARNING: 
You have to remove these excludes if your code relies on other versions of 
these dependencies. -->
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>com.typesafe:config:*</exclude>
<exclude>junit:junit:*</exclude>
<exclude>com.codahale.metrics:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<!-- exclude shaded google but include shaded curator -->
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise, 
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<includes>
<include>org/apache/calcite/**</include>
<include>org/apache/flink/calcite/shaded/**</include>
<include>org/apache/flink/table/**</include>
<include>org.codehaus.commons.compiler.properties</include>
<include>org/codehaus/janino/**</include>
<include>org/codehaus/commons/**</include>
</includes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment 
the following lines. This will add a Main-Class entry to the manifest file -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sense.flink.App</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


Thanks



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <[hidden email]> wrote:
oh, yes. I just saw. I will use 1.9 then. thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[hidden email]> wrote:
It's because all blink codes are not shipped with 1.8.0, they current only available in 1.9-SNAPSHOT.

Best,
Kurt


On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

what are the artifacts that I have to import on maven in order to use Blink Api?

I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>1.8.0</version>
</dependency>


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Kurt Young
I think you might mixed some test codes with the operator.  "List<String> getOutputs()"  is from "TestMapBundleFunction" and only used for validation. 
For the real usage, you need to write whatever records you want to emit to the "collector" which passed in during "finishBundle".

Best,
Kurt


On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <[hidden email]> wrote:
Hi Kurt,

How do you make the finishBundle method returns the combined tuples? I saw that there is a method "List<String> getOutputs()" which is never called.

I did an implementation based on the example that you suggested. The MapBundleFunctionImpl class has the method finishBundle which iterate all the combined tuples and return it. However, my application does not continue to receive tuples after the transform method.

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <[hidden email]> wrote:
I think you can simply copy the source codes to your project if maven dependency can not be used.

Best,
Kurt


On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <[hidden email]> wrote:
Hi again Kurt,

could you please help me with the pom.xml file? I have included all Table ecosystem dependencies and the flink-table-runtime-blink as well. However the class org.apache.flink.table.runtime.context.ExecutionContext is still not found. I guess I am missing some dependency, but I do not know which. This is my pom.xml file.

<modelVersion>4.0.0</modelVersion>

<groupId>org.sense.flink</groupId>
<artifactId>explore-flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>explore-flink</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
<scala.binary.version>2.11</scala.binary.version>
<!-- <flink.version>1.8.0</flink.version> -->
<flink.version>1.9-SNAPSHOT</flink.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.15</version>
<!-- <scope>provided</scope> -->
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
<build>
<finalName>explore-flink</finalName>
<plugins>
<!-- download source code in Eclipse, best practice -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.10</version>
<configuration>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>

<!-- Set a compiler level -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>

<!-- Maven Shade Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<!-- Run shade goal on package phase -->
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:*</exclude>
<!-- Also exclude very big transitive dependencies of Flink WARNING: 
You have to remove these excludes if your code relies on other versions of 
these dependencies. -->
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>com.typesafe:config:*</exclude>
<exclude>junit:junit:*</exclude>
<exclude>com.codahale.metrics:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<!-- exclude shaded google but include shaded curator -->
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise, 
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<includes>
<include>org/apache/calcite/**</include>
<include>org/apache/flink/calcite/shaded/**</include>
<include>org/apache/flink/table/**</include>
<include>org.codehaus.commons.compiler.properties</include>
<include>org/codehaus/janino/**</include>
<include>org/codehaus/commons/**</include>
</includes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment 
the following lines. This will add a Main-Class entry to the manifest file -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sense.flink.App</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


Thanks



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <[hidden email]> wrote:
oh, yes. I just saw. I will use 1.9 then. thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[hidden email]> wrote:
It's because all blink codes are not shipped with 1.8.0, they current only available in 1.9-SNAPSHOT.

Best,
Kurt


On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

what are the artifacts that I have to import on maven in order to use Blink Api?

I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>1.8.0</version>
</dependency>


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Felipe Gutierrez
Thanks for the tip! I guess now it is working as it should be.

Just one last question. Why did you decide to use "AbstractStreamOperator" instead of "AbstractUdfStreamOperator". I am asking because I was basing my solution also (I also looked at your solution) on the "StreamFlatMap" class implementation.

Best,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Wed, Apr 17, 2019 at 4:13 AM Kurt Young <[hidden email]> wrote:
I think you might mixed some test codes with the operator.  "List<String> getOutputs()"  is from "TestMapBundleFunction" and only used for validation. 
For the real usage, you need to write whatever records you want to emit to the "collector" which passed in during "finishBundle".

Best,
Kurt


On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <[hidden email]> wrote:
Hi Kurt,

How do you make the finishBundle method returns the combined tuples? I saw that there is a method "List<String> getOutputs()" which is never called.

I did an implementation based on the example that you suggested. The MapBundleFunctionImpl class has the method finishBundle which iterate all the combined tuples and return it. However, my application does not continue to receive tuples after the transform method.

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <[hidden email]> wrote:
I think you can simply copy the source codes to your project if maven dependency can not be used.

Best,
Kurt


On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <[hidden email]> wrote:
Hi again Kurt,

could you please help me with the pom.xml file? I have included all Table ecosystem dependencies and the flink-table-runtime-blink as well. However the class org.apache.flink.table.runtime.context.ExecutionContext is still not found. I guess I am missing some dependency, but I do not know which. This is my pom.xml file.

<modelVersion>4.0.0</modelVersion>

<groupId>org.sense.flink</groupId>
<artifactId>explore-flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>explore-flink</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
<scala.binary.version>2.11</scala.binary.version>
<!-- <flink.version>1.8.0</flink.version> -->
<flink.version>1.9-SNAPSHOT</flink.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.15</version>
<!-- <scope>provided</scope> -->
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
<build>
<finalName>explore-flink</finalName>
<plugins>
<!-- download source code in Eclipse, best practice -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.10</version>
<configuration>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>

<!-- Set a compiler level -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>

<!-- Maven Shade Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<!-- Run shade goal on package phase -->
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:*</exclude>
<!-- Also exclude very big transitive dependencies of Flink WARNING: 
You have to remove these excludes if your code relies on other versions of 
these dependencies. -->
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>com.typesafe:config:*</exclude>
<exclude>junit:junit:*</exclude>
<exclude>com.codahale.metrics:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<!-- exclude shaded google but include shaded curator -->
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise, 
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<includes>
<include>org/apache/calcite/**</include>
<include>org/apache/flink/calcite/shaded/**</include>
<include>org/apache/flink/table/**</include>
<include>org.codehaus.commons.compiler.properties</include>
<include>org/codehaus/janino/**</include>
<include>org/codehaus/commons/**</include>
</includes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment 
the following lines. This will add a Main-Class entry to the manifest file -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sense.flink.App</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


Thanks



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <[hidden email]> wrote:
oh, yes. I just saw. I will use 1.9 then. thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[hidden email]> wrote:
It's because all blink codes are not shipped with 1.8.0, they current only available in 1.9-SNAPSHOT.

Best,
Kurt


On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

what are the artifacts that I have to import on maven in order to use Blink Api?

I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>1.8.0</version>
</dependency>


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Kurt Young
There is no reason for it, the operator and function doesn't rely on the logic which AbstractUdfStreamOperator supplied.

Best,
Kurt


On Wed, Apr 17, 2019 at 4:35 PM Felipe Gutierrez <[hidden email]> wrote:
Thanks for the tip! I guess now it is working as it should be.

Just one last question. Why did you decide to use "AbstractStreamOperator" instead of "AbstractUdfStreamOperator". I am asking because I was basing my solution also (I also looked at your solution) on the "StreamFlatMap" class implementation.

Best,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Wed, Apr 17, 2019 at 4:13 AM Kurt Young <[hidden email]> wrote:
I think you might mixed some test codes with the operator.  "List<String> getOutputs()"  is from "TestMapBundleFunction" and only used for validation. 
For the real usage, you need to write whatever records you want to emit to the "collector" which passed in during "finishBundle".

Best,
Kurt


On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <[hidden email]> wrote:
Hi Kurt,

How do you make the finishBundle method returns the combined tuples? I saw that there is a method "List<String> getOutputs()" which is never called.

I did an implementation based on the example that you suggested. The MapBundleFunctionImpl class has the method finishBundle which iterate all the combined tuples and return it. However, my application does not continue to receive tuples after the transform method.

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <[hidden email]> wrote:
I think you can simply copy the source codes to your project if maven dependency can not be used.

Best,
Kurt


On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <[hidden email]> wrote:
Hi again Kurt,

could you please help me with the pom.xml file? I have included all Table ecosystem dependencies and the flink-table-runtime-blink as well. However the class org.apache.flink.table.runtime.context.ExecutionContext is still not found. I guess I am missing some dependency, but I do not know which. This is my pom.xml file.

<modelVersion>4.0.0</modelVersion>

<groupId>org.sense.flink</groupId>
<artifactId>explore-flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>explore-flink</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
<scala.binary.version>2.11</scala.binary.version>
<!-- <flink.version>1.8.0</flink.version> -->
<flink.version>1.9-SNAPSHOT</flink.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.15</version>
<!-- <scope>provided</scope> -->
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
<build>
<finalName>explore-flink</finalName>
<plugins>
<!-- download source code in Eclipse, best practice -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.10</version>
<configuration>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>

<!-- Set a compiler level -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>

<!-- Maven Shade Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<!-- Run shade goal on package phase -->
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:*</exclude>
<!-- Also exclude very big transitive dependencies of Flink WARNING: 
You have to remove these excludes if your code relies on other versions of 
these dependencies. -->
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>com.typesafe:config:*</exclude>
<exclude>junit:junit:*</exclude>
<exclude>com.codahale.metrics:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<!-- exclude shaded google but include shaded curator -->
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise, 
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<includes>
<include>org/apache/calcite/**</include>
<include>org/apache/flink/calcite/shaded/**</include>
<include>org/apache/flink/table/**</include>
<include>org.codehaus.commons.compiler.properties</include>
<include>org/codehaus/janino/**</include>
<include>org/codehaus/commons/**</include>
</includes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment 
the following lines. This will add a Main-Class entry to the manifest file -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sense.flink.App</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


Thanks



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <[hidden email]> wrote:
oh, yes. I just saw. I will use 1.9 then. thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[hidden email]> wrote:
It's because all blink codes are not shipped with 1.8.0, they current only available in 1.9-SNAPSHOT.

Best,
Kurt


On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

what are the artifacts that I have to import on maven in order to use Blink Api?

I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>1.8.0</version>
</dependency>


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How would I use OneInputStreamOperator to deal with data skew?

Kurt Young
I mean no particular reason. 

Best,
Kurt


On Wed, Apr 17, 2019 at 7:44 PM Kurt Young <[hidden email]> wrote:
There is no reason for it, the operator and function doesn't rely on the logic which AbstractUdfStreamOperator supplied.

Best,
Kurt


On Wed, Apr 17, 2019 at 4:35 PM Felipe Gutierrez <[hidden email]> wrote:
Thanks for the tip! I guess now it is working as it should be.

Just one last question. Why did you decide to use "AbstractStreamOperator" instead of "AbstractUdfStreamOperator". I am asking because I was basing my solution also (I also looked at your solution) on the "StreamFlatMap" class implementation.

Best,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Wed, Apr 17, 2019 at 4:13 AM Kurt Young <[hidden email]> wrote:
I think you might mixed some test codes with the operator.  "List<String> getOutputs()"  is from "TestMapBundleFunction" and only used for validation. 
For the real usage, you need to write whatever records you want to emit to the "collector" which passed in during "finishBundle".

Best,
Kurt


On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <[hidden email]> wrote:
Hi Kurt,

How do you make the finishBundle method returns the combined tuples? I saw that there is a method "List<String> getOutputs()" which is never called.

I did an implementation based on the example that you suggested. The MapBundleFunctionImpl class has the method finishBundle which iterate all the combined tuples and return it. However, my application does not continue to receive tuples after the transform method.

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <[hidden email]> wrote:
I think you can simply copy the source codes to your project if maven dependency can not be used.

Best,
Kurt


On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <[hidden email]> wrote:
Hi again Kurt,

could you please help me with the pom.xml file? I have included all Table ecosystem dependencies and the flink-table-runtime-blink as well. However the class org.apache.flink.table.runtime.context.ExecutionContext is still not found. I guess I am missing some dependency, but I do not know which. This is my pom.xml file.

<modelVersion>4.0.0</modelVersion>

<groupId>org.sense.flink</groupId>
<artifactId>explore-flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>explore-flink</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
<scala.binary.version>2.11</scala.binary.version>
<!-- <flink.version>1.8.0</flink.version> -->
<flink.version>1.9-SNAPSHOT</flink.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.15</version>
<!-- <scope>provided</scope> -->
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
<build>
<finalName>explore-flink</finalName>
<plugins>
<!-- download source code in Eclipse, best practice -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.10</version>
<configuration>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>

<!-- Set a compiler level -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>

<!-- Maven Shade Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<!-- Run shade goal on package phase -->
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:*</exclude>
<!-- Also exclude very big transitive dependencies of Flink WARNING: 
You have to remove these excludes if your code relies on other versions of 
these dependencies. -->
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>com.typesafe:config:*</exclude>
<exclude>junit:junit:*</exclude>
<exclude>com.codahale.metrics:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<!-- exclude shaded google but include shaded curator -->
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise, 
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<includes>
<include>org/apache/calcite/**</include>
<include>org/apache/flink/calcite/shaded/**</include>
<include>org/apache/flink/table/**</include>
<include>org.codehaus.commons.compiler.properties</include>
<include>org/codehaus/janino/**</include>
<include>org/codehaus/commons/**</include>
</includes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment 
the following lines. This will add a Main-Class entry to the manifest file -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sense.flink.App</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


Thanks



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <[hidden email]> wrote:
oh, yes. I just saw. I will use 1.9 then. thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[hidden email]> wrote:
It's because all blink codes are not shipped with 1.8.0, they current only available in 1.9-SNAPSHOT.

Best,
Kurt


On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

what are the artifacts that I have to import on maven in order to use Blink Api?

I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink</artifactId>
<version>1.8.0</version>
</dependency>


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
Cool, thanks Kurt! 
-
- Felipe Gutierrez
- skype: felipe.o.gutierrez


On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
Hi,

You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java

Best,
Kurt


On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling).

I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this?


Thanks!
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez