Hi,
Is there any library to use and remember the apache flink snapshot? Thank you Regards, Abu Bakar Siddiqur Rahman |
Could you expand a bit on what you
mean? Are you referring to savepoints?
On 1/28/2021 3:24 PM, Abu Bakar
Siddiqur Rahman Rocky wrote:
|
Hi, Is there any source code for the checkpoints, snapshot and zookeeper mechanism? Thank you On Mon, Feb 1, 2021 at 4:23 AM Chesnay Schepler <[hidden email]> wrote:
Regards, Abu Bakar Siddiqur Rahman Graduate Research Student Natural Language Processing Laboratory Centro de Investigacion en Computacion Instituto Politecnico Nacional, Mexico City |
Sure.
On 2/3/2021 3:08 AM, Abu Bakar Siddiqur
Rahman Rocky wrote:
|
Hi, Is there anyone who can inform me how I can connect a Java program to Apache Flink (in mac)? Thank you! Regards, Abu Bakar Siddiqur Rahman On Thu, Feb 11, 2021 at 4:26 AM Abu Bakar Siddiqur Rahman Rocky <[hidden email]> wrote:
Regards, Abu Bakar Siddiqur Rahman |
Hi Abu Bakar Siddiqur Rahman, Have you had a look at the Flink documentation [1]? It provides step-by-step instructions on how to run a job (the Flink binaries provide example jobs under ./examples) using a local standalone cluster. This should also work on a Mac. You would just need to start the Flink cluster (./bin/start-cluster.sh) and submit a job using one of the example jars provided in the binaries (e.g. ./bin/flink run -d ./examples/streaming/WordCount.jar). You can check the job running in Flink's web UI being available under http://localhost:8081 if you use the default configuration provided by the Flink binaries. Best, Matthias On Thu, Feb 11, 2021 at 3:45 PM Abu Bakar Siddiqur Rahman Rocky <[hidden email]> wrote:
|
Thank you for your reply. Another Question: After Checkpointing, we save our snapshot to a storage. How can we access the storage? is this the source code: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java If it is not, could you please provide me the source code to access in the storage where snapshots are saved? Thank you On Fri, Feb 12, 2021 at 2:44 AM Matthias Pohl <[hidden email]> wrote:
Regards, Abu Bakar Siddiqur Rahman |
Checkpoints are stored in some DFS storage. The location can be specified using state.checkpoints.dir configuration property [1]. You can access the state of a savepoint or checkpoint using the State Processor API [2]. Best, Matthias On Fri, Feb 12, 2021 at 5:35 PM Abu Bakar Siddiqur Rahman Rocky <[hidden email]> wrote:
Matthias Pohl | Engineer Follow us @VervericaData Ververica -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner |
Hi, I read it: https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html I can run the code in the UI of Apache Flink that is in the bin file of Apache Flink. If I run a java code from intellij idea or eclipse, then how can I connect the code to apache flink UI? Thank you! On Fri, Feb 12, 2021 at 11:43 AM Matthias Pohl <[hidden email]> wrote:
Regards, Abu Bakar Siddiqur Rahman |
Hi, running your job from within your IDE with no specific configuration provided (like the Flink job examples provided by the Flink [1]) means that you spin up a local Flink cluster (see MiniCluster [2]). This does not have the web UI enabled by default. You could enable it by calling `StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);`, instead. Don't forget to add the `flink-runtime-web` dependency: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> Best, Matthias On Sat, Feb 20, 2021 at 4:07 AM Abu Bakar Siddiqur Rahman Rocky <[hidden email]> wrote:
|
Hi Matthias Pohl, Thank you for your reply. At first, I'm sorry if my question make you confuse. Let me know if it's unclear to you. (1) To run a code in Flink, we will have to use this command: ./bin/flink run /home/username/folder/code.jar Is it correct? (2) I run a code in eclipse, it gives the error according to attached pic. I guess, if (1) is correct, then I still can't run in the Flink due to the error of the code. The code cannot be resolved org.apache or checkpointing mode Thank you On Mon, Feb 22, 2021, 7:48 AM Matthias Pohl <[hidden email]> wrote:
IMG_20210222_095828.jpg (6M) Download Attachment |
Yes, Flink jobs are deployed using `./bin/flink run`. It will use the configuration in conf/flink-conf.yaml to connect to the Flink cluster. It looks like you don't have the right dependencies loaded onto your classpath. Have you had a look at the documentation about project configuration [1]? This gives you insight on how to set up the dependencies for your Flink project. "Setting up a Project: Basic Dependencies" [2] describes the basic requirements for the project dependencies. Maven Quickstart [3] in contrast shows you how to initialize a Maven-based Flink project. Best, Matthias On Mon, Feb 22, 2021 at 5:06 PM Abu Bakar Siddiqur Rahman Rocky <[hidden email]> wrote:
|
Hi, Thank you for the last response. The error is solved (according to my last question) by using the dependencies. This time, answer me when you'll have available time because it's a long one. I gave my steps using #########numbers ####### ###################. (1) I used this at the time of creating the Maven Project in Eclipse IDE. #########################
Then in pom section, I put this dependencies:
################# (2) This is the code I used: ##################### package p1; import java.sql.Timestamp; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.PropertyConfigurator; public class snashotapache { public static void main(String[] args) throws Exception { BasicConfigurator.configure(); String log4jConfPath = "/Users/abubakarsiddiqurrahman/Documents/flink-quickstart-java/log4j.properties"; PropertyConfigurator.configure(log4jConfPath); // set up the streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // to set minimum progress time to happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // checkpoints have to complete within 10000 ms, or are discarded env.getCheckpointConfig().setCheckpointTimeout(10000); // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // AT_LEAST_ONCE // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, 100 )); // number of restart attempts , delay in each restart DataStream<String> data = env.socketTextStream("localhost", 9090); DataStream<Long> sum = data.map(new MapFunction<String, Tuple2<Long, String>>() { public Tuple2<Long, String> map(String s) { String[] words = s.split(","); return new Tuple2<Long, String>(Long.parseLong(words[0]), words[1]); } }) .keyBy(0) .flatMap(new StatefulMap()); sum.writeAsText("/Users/abubakarsiddiqurrahman/Documents"); // execute program env.execute("State"); } public static class StatefulMap extends RichFlatMapFunction<Tuple2<Long, String>, Long> { private transient ValueState<Long> sum; private transient ValueState<Long> count; public void flatMap(Tuple2<Long, String> input, Collector<Long> out)throws Exception { Long currCount = count.value(); Long currSum = sum.value(); currCount += 1; currSum = currSum + Long.parseLong(input.f1); count.update(currCount); sum.update(currSum); if (currCount >= 10) { /* emit sum of last 10 elements */ out.collect(sum.value()); /* clear value */ count.clear(); sum.clear(); } } public void open(Configuration conf) { ValueStateDescriptor<Long> descriptor =new ValueStateDescriptor<Long>("sum", TypeInformation.of(new TypeHint<Long>() {}), 0L); sum = getRuntimeContext().getState(descriptor); ValueStateDescriptor<Long> descriptor2 = new ValueStateDescriptor<Long>( "count", TypeInformation.of(new TypeHint<Long>() {}), 0L); count = getRuntimeContext().getState(descriptor2); } } } ############################ (3) Error ######################## log4j:ERROR Could not read configuration file [/Users/abubakarsiddiqurrahman/Documents/flink-quickstart-java/log4j.properties]. java.io.FileNotFoundException: /Users/abubakarsiddiqurrahman/Documents/flink-quickstart-java/log4j.properties (No such file or directory) at java.base/java.io.FileInputStream.open0(Native Method) at java.base/java.io.FileInputStream.open(FileInputStream.java:211) at java.base/java.io.FileInputStream.<init>(FileInputStream.java:153) at java.base/java.io.FileInputStream.<init>(FileInputStream.java:108) at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:372) at org.apache.log4j.PropertyConfigurator.configure(PropertyConfigurator.java:403) at p1.snashotapache.main(snashotapache.java:45) log4j:ERROR Ignoring configuration file [/Users/abubakarsiddiqurrahman/Documents/flink-quickstart-java/log4j.properties]. 0 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.String 5 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the [B WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/abubakarsiddiqurrahman/.m2/repository/org/apache/flink/flink-core/1.12.0/flink-core-1.12.0.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 43 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Byte 43 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Integer 43 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Boolean 47 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Integer 47 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.String 47 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the [B 47 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Byte 48 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Integer 48 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Boolean 48 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Long 48 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Long 48 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Boolean 134 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the org.apache.flink.api.java.typeutils.runtime.TupleComparator 152 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Integer 152 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the [Ljava.lang.Object; 163 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the org.apache.flink.api.java.typeutils.runtime.TupleComparator 167 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Integer 167 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the [Ljava.lang.Object; 188 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the org.apache.flink.api.java.io.TextOutputFormat 188 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.String 188 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the [B 188 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Byte 189 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Integer 189 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Boolean 199 [main] DEBUG org.apache.flink.api.java.ClosureCleaner - Dig to clean the java.lang.Boolean 230 [main] DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming OneInputTransformation{id=2, name='Map', outputType=Java Tuple2<Long, String>, parallelism=8} 231 [main] DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming LegacySourceTransformation{id=1, name='Socket Stream', outputType=String, parallelism=1} 242 [main] DEBUG org.apache.flink.streaming.api.graph.StreamGraph - Vertex: 1 244 [main] DEBUG org.apache.flink.streaming.api.graph.StreamGraph - Vertex: 2 244 [main] DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming OneInputTransformation{id=4, name='Flat Map', outputType=Long, parallelism=8} 244 [main] DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming PartitionTransformation{id=3, name='Partition', outputType=Java Tuple2<Long, String>, parallelism=8} 245 [main] DEBUG org.apache.flink.streaming.api.graph.StreamGraph - Vertex: 4 247 [main] DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming LegacySinkTransformation{id=5, name='Unnamed', outputType=GenericType<java.lang.Object>, parallelism=8} 247 [main] DEBUG org.apache.flink.streaming.api.graph.StreamGraph - Vertex: 5 Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application. at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1931) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1836) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822) at p1.snashotapache.main(snashotapache.java:91) ################### 4 attachment ################################### Please, see the attachment where the screenshot of the code given, the pom file. ##################. 5 Questions ######################## (1) in the code, line no 86, 88, 122, 125: some function gives warning and the keyby() or other functions in those lines are in cross inside, why is it that? (2) I just create a maven project in eclipse and put the dependencies in the pom.xml, and run the code according to section 2. I get this error in section 3 with the warnings. Could you please inform me, is my step correct? ########## 6. quick question (if you do not understand section 1 to 5) ########## I want to take a snapshot/checkpointing, and want to access the database to read/write the saved snapshot. So, my question: Please, give me an instruction with source code that how can I do this? On Tue, Feb 23, 2021 at 1:28 AM Matthias Pohl <[hidden email]> wrote:
Regards, Abu Bakar Siddiqur Rahman Screen Shot 2021-02-26 at 9.19.57 AM.png (1M) Download Attachment Screen Shot 2021-02-26 at 9.20.46 AM.png (1M) Download Attachment Screen Shot 2021-02-26 at 9.20.18 AM.png (970K) Download Attachment Screen Shot 2021-02-26 at 9.38.04 AM.png (987K) Download Attachment |
Free forum by Nabble | Edit this page |