Hi, I was trying to implement a better way to handle data skew using Flink and I found this talk from #FlinkForward SF 2017: "Cliff Resnick & Seth Wiesman - From Zero to Streaming" [1] which says that they used OneInputStreamOperator [2]. Through it, they could implement the "combiner" in Hadoop (execute part of the reduce tasks on the Map phase, before shuffling). I need some help here. What are some of the Flink source-code operators that I can peek up to implement my on operator that deals with data skew? Or maybe, is there someone that have an example of a use case similar to this? Thanks! Felipe |
Hi, You can checkout the bundle operator which used in Blink to perform similar thing you mentioned: https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java Best, Kurt On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <[hidden email]> wrote:
|
Cool, thanks Kurt! On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[hidden email]> wrote:
|
Hi, what are the artifacts that I have to import on maven in order to use Blink Api? I am using Flink 1.8.0 and I am trying to import blink code to use its ExecutionContext. I want to do this in order to implement my own operator like it is implemented here. I guess if I import flink-table everything should come inside the same jar as it is done here. However, I cannot import "flink-table-runtime-blink". Eclipse says that it is a missing artifact. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.8.0</version> <type>pom</type> <scope>provided</scope> </dependency> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT --> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink</artifactId> <version>1.8.0</version> </dependency> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <[hidden email]> wrote:
|
It's because all blink codes are not shipped with 1.8.0, they current only available in 1.9-SNAPSHOT. Best, Kurt On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <[hidden email]> wrote:
|
oh, yes. I just saw. I will use 1.9 then. thanks On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[hidden email]> wrote:
|
Hi again Kurt, could you please help me with the pom.xml file? I have included all Table ecosystem dependencies and the flink-table-runtime-blink as well. However the class org.apache.flink.table.runtime.context.ExecutionContext is still not found. I guess I am missing some dependency, but I do not know which. This is my pom.xml file. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <modelVersion>4.0.0</modelVersion> <groupId>org.sense.flink</groupId> <artifactId>explore-flink</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>explore-flink</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <jdk.version>1.8</jdk.version> <scala.binary.version>2.11</scala.binary.version> <!-- <flink.version>1.8.0</flink.version> --> <flink.version>1.9-SNAPSHOT</flink.version> <junit.version>4.12</junit.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-metrics-dropwizard</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Table ecosystem --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.15</version> <!-- <scope>provided</scope> --> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> </dependencies> <build> <finalName>explore-flink</finalName> <plugins> <!-- download source code in Eclipse, best practice --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.10</version> <configuration> <downloadSources>true</downloadSources> <downloadJavadocs>false</downloadJavadocs> </configuration> </plugin> <!-- Set a compiler level --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>${jdk.version}</source> <target>${jdk.version}</target> </configuration> </plugin> <!-- Maven Shade Plugin --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.0</version> <!-- Run shade goal on package phase --> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:*</exclude> <!-- Also exclude very big transitive dependencies of Flink WARNING: You have to remove these excludes if your code relies on other versions of these dependencies. --> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> <exclude>com.typesafe:config:*</exclude> <exclude>junit:junit:*</exclude> <exclude>com.codahale.metrics:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>org.apache.flink:*</artifact> <excludes> <!-- exclude shaded google but include shaded curator --> <exclude>org/apache/flink/shaded/com/**</exclude> <exclude>web-docs/**</exclude> </excludes> </filter> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> <filter> <artifact>*:*</artifact> <includes> <include>org/apache/calcite/**</include> <include>org/apache/flink/calcite/shaded/**</include> <include>org/apache/flink/table/**</include> <include>org.codehaus.commons.compiler.properties</include> <include>org/codehaus/janino/**</include> <include>org/codehaus/commons/**</include> </includes> </filter> </filters> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment the following lines. This will add a Main-Class entry to the manifest file --> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.sense.flink.App</mainClass> </transformer> </transformers> <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> Thanks On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <[hidden email]> wrote:
|
I think you can simply copy the source codes to your project if maven dependency can not be used. Best, Kurt On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <[hidden email]> wrote:
|
Hi Kurt, How do you make the finishBundle method returns the combined tuples? I saw that there is a method "List<String> getOutputs()" which is never called. I did an implementation based on the example that you suggested. The MapBundleFunctionImpl class has the method finishBundle which iterate all the combined tuples and return it. However, my application does not continue to receive tuples after the transform method. Thanks, Felipe On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <[hidden email]> wrote:
|
I think you might mixed some test codes with the operator. "List<String> getOutputs()" is from "TestMapBundleFunction" and only used for validation. For the real usage, you need to write whatever records you want to emit to the "collector" which passed in during "finishBundle". Best, Kurt On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <[hidden email]> wrote:
|
Thanks for the tip! I guess now it is working as it should be. Just one last question. Why did you decide to use "AbstractStreamOperator" instead of "AbstractUdfStreamOperator". I am asking because I was basing my solution also (I also looked at your solution) on the "StreamFlatMap" class implementation. Best, Felipe On Wed, Apr 17, 2019 at 4:13 AM Kurt Young <[hidden email]> wrote:
|
There is no reason for it, the operator and function doesn't rely on the logic which AbstractUdfStreamOperator supplied. Best, Kurt On Wed, Apr 17, 2019 at 4:35 PM Felipe Gutierrez <[hidden email]> wrote:
|
I mean no particular reason. Best, Kurt On Wed, Apr 17, 2019 at 7:44 PM Kurt Young <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |