Flink 1.11 FlinkKafkaConsumer not propagating watermarks

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11 FlinkKafkaConsumer not propagating watermarks

Edward Bingham

Hi everyone,

I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some Flink processors using Flink 1.12, and tried to get them working on Amazon EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I went to downgrade, I found, inexplicably, that watermarks were no longer propagating.

There is only one partition on the topic, and parallelism is set to 1. Is there something I'm missing here? I feel like I'm going a bit crazy.

I've cross-posted this on stackoverflow, but I figure the mailing list is probably a better avenue for this question.

Thanks,
Ned


Here's the output for Flink 1.12 (correctly propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Process",
    "pact" : "Operator",
    "contents" : "Process",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=86400000 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=864000000 watermark=0] "test message"
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=8640000000 watermark=777600000] "test message"
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=86400000000 watermark=8553600000] "test message"
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=9223372036854775807 watermark=86313600000] "test message"
Emitting watermark 9223372036768375807

And here is the output for Flink 1.11 (not propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Process",
    "pact" : "Operator",
    "contents" : "Process",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 9223372036768375807

Here's the integration test that exposes it:

package mytest;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;

import java.nio.file.Files;
import java.nio.file.Paths;

import java.text.SimpleDateFormat;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;

import kafka.utils.MockTime;
import kafka.utils.TestUtils;

import kafka.zk.EmbeddedZookeeper;

import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;

import org.junit.*;

public class FailTest {
    private static EmbeddedZookeeper zooKeeper = null;
    private static KafkaServer server = null;
    public static AdminClient admin = null;
    private static int connected = 0;

    private static StringSerializer stringSerializer = new StringSerializer();
    private static StringDeserializer stringDeserializer = new StringDeserializer();

    private static final Properties ZooKeeperProperties = getZooKeeperProperties();
    private static final Properties ServerProperties        = getServerProperties();
    private static final Properties ProducerProperties  = getProducerProperties();

    public static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    public static Properties getProducerProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("bootstrap.servers", "localhost:9092");
        result.put("compression.type", "none");
        return result;
    }

    public static Properties getServerProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("broker.id", "0");
        result.put("num.network.threads", "3");
        result.put("num.io.threads", "8");
        result.put("socket.send.buffer.bytes", "102400");
        result.put("socket.recv.buffer.bytes", "102400");
        result.put("log.dirs", "target/kafka-logs");
        result.put("num.partitions", "1");
        result.put("offset.topic.replication.factor", "1");
        result.put("transaction.state.log.replication.factor", "1");
        result.put("transaction.state.log.min.isr", "1");
        result.put("auto.create.topics.enable", "true");
        result.put("log.retention.hours", "168");
        result.put("log.segment.bytes", "1073741824");
        result.put("log.retention.check.interval.ms", "300000");
        result.put("zookeeper.connect", "localhost:2181");
        result.put("zookeeper.connection.timeout.ms", "18000");
        result.put("group.initial.rebalance.delay.ms", "0");
        return result;
    }

    public static Properties getZooKeeperProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("dataDir", "/tmp/zookeeper");
        result.put("clientPort", "2181");
        result.put("maxClientCnxns", "0");
        result.put("admin.enableServer", "false");
        return result;
    }

    private static Properties getNewLogDir(Properties props) {
        String path = props.getProperty("log.dirs");
        path = path + "/run.";
        int index = 0;
        boolean done = false;
        while (!Files.notExists(Paths.get(path + String.valueOf(index)))) {
            index += 1;
        }
        props.setProperty("log.dirs", path + String.valueOf(index));
        return props;
    }

    public static class Print<V> extends ProcessFunction<V, V> {
        private static final ObjectMapper mapper = new ObjectMapper();
        public String prefix;

        public Print() {
            this.prefix = "";
        }

        public Print(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public void processElement(V value, Context ctx, Collector<V> out) {
            System.out.printf("%s ", prefix);
            if (ctx != null) {
                TimerService srv = ctx.timerService();
                Long timestampLong = ctx.timestamp();
                long timestamp = 0;
                if (timestampLong != null) {
                    timestamp = timestampLong;
                }
                long watermark = 0;
                if (srv != null) {
                    watermark = srv.currentWatermark();
                }
                System.out.printf("[timestamp=%d watermark=%d] ", timestamp, watermark);
            }

            if (value == null) {
                System.out.println("null");
            } else {
                try {
                    System.out.println(new String(mapper.writeValueAsBytes(value)));
                } catch (Exception e) {
                    System.out.println("exception");
                    e.printStackTrace();
                }
            }
            out.collect(value);
        }
    }

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
        new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(2)
                .setNumberTaskManagers(1)
                .build());

    @BeforeClass
    public static void setup() {
        env.setParallelism(1);
        if (connected == 0) {
            zooKeeper = new EmbeddedZookeeper();
            ServerProperties.setProperty("zookeeper.connect", "localhost:" + zooKeeper.port());

            server = TestUtils.createServer(new KafkaConfig(getNewLogDir(ServerProperties)), new MockTime());
            admin = AdminClient.create(ProducerProperties);
        }
        connected += 1;
    }

    @AfterClass
    public static void tearDown() {
        if (connected == 1) {
            try {
                server.shutdown();
                zooKeeper.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }

            zooKeeper = null;
            server = null;
            admin = null;
        }
        connected -= 1;
    }

    @Test
    public void testFail() throws Exception {
        String inputTopic = "input";

        Map<String, String> configs = new HashMap<>();
        int partitions = 1;
        short replication = 1;

        CreateTopicsResult result = admin.createTopics(Arrays.asList(
            new NewTopic(inputTopic, partitions, replication).configs(configs)
        ));
        result.all().get();

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(ProducerProperties, stringSerializer, stringSerializer);
;

        DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(inputTopic));
        for (Map.Entry<String, TopicDescription> topic : topics.all().get().entrySet()) {
            System.out.printf("%s %d\n", topic.getValue().name(), topic.getValue().partitions().size());
            System.out.println(topic.getValue().toString());
        }

        // Some subscription events
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(10).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(100).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1000).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Long.MAX_VALUE, "0", "test message"));
        producer.flush();
        producer.close();

        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        prop.put("group.id", "0");
        prop.put("enable.auto.commit", "true");
        prop.put("auto.commit.interval.ms", "1000");
        prop.put("session.timeout.ms", "30000");
        FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), prop);
        source.assignTimestampsAndWatermarks(
            new WatermarkStrategy<String>() {
                @Override
                public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return new TimestampAssigner<String>() {
                        @Override
                        public long extractTimestamp(String event, long recordTimestamp) {
                            System.out.printf("Assigning timestamp %d\n", recordTimestamp);
                            return recordTimestamp;
                        }
                    };
                }

                @Override
                public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new WatermarkGenerator<String>() {
                        public long latestWatermark = Long.MIN_VALUE;

                        @Override
                        public void onEvent(String event, long timestamp, WatermarkOutput output) {
                            long eventWatermark = timestamp - Time.days(1).toMilliseconds();
                            if (eventWatermark > latestWatermark) {
                                System.out.printf("Emitting watermark %d\n", eventWatermark);
                                output.emitWatermark(new Watermark(eventWatermark));
                                latestWatermark = eventWatermark;
                            }
                        }

                        @Override
                        public void onPeriodicEmit(WatermarkOutput output) {
                        }
                    };
                }
            });
        source.setStartFromEarliest();

        env.addSource(source)
            .process(new Print<String>("Source"));

        System.out.println(env.getExecutionPlan());
        JobClient client = null;
        try {
            client = env.executeAsync("Fail Test");
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }

        topics = admin.describeTopics(Arrays.asList(inputTopic));
        for (Map.Entry<String, TopicDescription> topic : topics.all().get().entrySet()) {
            System.out.printf("%s %d\n", topic.getValue().name(), topic.getValue().partitions().size());
            System.out.println(topic.getValue().toString());
        }

        TimeUnit.SECONDS.sleep(5);
        client.cancel().get(5, TimeUnit.SECONDS);
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 FlinkKafkaConsumer not propagating watermarks

Arvid Heise-4
For reference: self answered on [1].

Turns out that Flink 1.12 defaults the TimeCharacteristic to EventTime and deprecates the whole TimeCharacteristic flow. So to downgrade to Flink 1.11, you must add the following statement to configure the StreamExecutionEnvironment.

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


On Thu, Apr 15, 2021 at 12:08 AM Edward Bingham <[hidden email]> wrote:

Hi everyone,

I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some Flink processors using Flink 1.12, and tried to get them working on Amazon EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I went to downgrade, I found, inexplicably, that watermarks were no longer propagating.

There is only one partition on the topic, and parallelism is set to 1. Is there something I'm missing here? I feel like I'm going a bit crazy.

I've cross-posted this on stackoverflow, but I figure the mailing list is probably a better avenue for this question.

Thanks,
Ned


Here's the output for Flink 1.12 (correctly propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Process",
    "pact" : "Operator",
    "contents" : "Process",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=86400000 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=864000000 watermark=0] "test message"
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=8640000000 watermark=777600000] "test message"
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=86400000000 watermark=8553600000] "test message"
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=9223372036854775807 watermark=86313600000] "test message"
Emitting watermark 9223372036768375807

And here is the output for Flink 1.11 (not propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Process",
    "pact" : "Operator",
    "contents" : "Process",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 9223372036768375807

Here's the integration test that exposes it:

package mytest;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;

import java.nio.file.Files;
import java.nio.file.Paths;

import java.text.SimpleDateFormat;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;

import kafka.utils.MockTime;
import kafka.utils.TestUtils;

import kafka.zk.EmbeddedZookeeper;

import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;

import org.junit.*;

public class FailTest {
    private static EmbeddedZookeeper zooKeeper = null;
    private static KafkaServer server = null;
    public static AdminClient admin = null;
    private static int connected = 0;

    private static StringSerializer stringSerializer = new StringSerializer();
    private static StringDeserializer stringDeserializer = new StringDeserializer();

    private static final Properties ZooKeeperProperties = getZooKeeperProperties();
    private static final Properties ServerProperties        = getServerProperties();
    private static final Properties ProducerProperties  = getProducerProperties();

    public static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    public static Properties getProducerProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("bootstrap.servers", "localhost:9092");
        result.put("compression.type", "none");
        return result;
    }

    public static Properties getServerProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("broker.id", "0");
        result.put("num.network.threads", "3");
        result.put("num.io.threads", "8");
        result.put("socket.send.buffer.bytes", "102400");
        result.put("socket.recv.buffer.bytes", "102400");
        result.put("log.dirs", "target/kafka-logs");
        result.put("num.partitions", "1");
        result.put("offset.topic.replication.factor", "1");
        result.put("transaction.state.log.replication.factor", "1");
        result.put("transaction.state.log.min.isr", "1");
        result.put("auto.create.topics.enable", "true");
        result.put("log.retention.hours", "168");
        result.put("log.segment.bytes", "1073741824");
        result.put("log.retention.check.interval.ms", "300000");
        result.put("zookeeper.connect", "localhost:2181");
        result.put("zookeeper.connection.timeout.ms", "18000");
        result.put("group.initial.rebalance.delay.ms", "0");
        return result;
    }

    public static Properties getZooKeeperProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("dataDir", "/tmp/zookeeper");
        result.put("clientPort", "2181");
        result.put("maxClientCnxns", "0");
        result.put("admin.enableServer", "false");
        return result;
    }

    private static Properties getNewLogDir(Properties props) {
        String path = props.getProperty("log.dirs");
        path = path + "/run.";
        int index = 0;
        boolean done = false;
        while (!Files.notExists(Paths.get(path + String.valueOf(index)))) {
            index += 1;
        }
        props.setProperty("log.dirs", path + String.valueOf(index));
        return props;
    }

    public static class Print<V> extends ProcessFunction<V, V> {
        private static final ObjectMapper mapper = new ObjectMapper();
        public String prefix;

        public Print() {
            this.prefix = "";
        }

        public Print(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public void processElement(V value, Context ctx, Collector<V> out) {
            System.out.printf("%s ", prefix);
            if (ctx != null) {
                TimerService srv = ctx.timerService();
                Long timestampLong = ctx.timestamp();
                long timestamp = 0;
                if (timestampLong != null) {
                    timestamp = timestampLong;
                }
                long watermark = 0;
                if (srv != null) {
                    watermark = srv.currentWatermark();
                }
                System.out.printf("[timestamp=%d watermark=%d] ", timestamp, watermark);
            }

            if (value == null) {
                System.out.println("null");
            } else {
                try {
                    System.out.println(new String(mapper.writeValueAsBytes(value)));
                } catch (Exception e) {
                    System.out.println("exception");
                    e.printStackTrace();
                }
            }
            out.collect(value);
        }
    }

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
        new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(2)
                .setNumberTaskManagers(1)
                .build());

    @BeforeClass
    public static void setup() {
        env.setParallelism(1);
        if (connected == 0) {
            zooKeeper = new EmbeddedZookeeper();
            ServerProperties.setProperty("zookeeper.connect", "localhost:" + zooKeeper.port());

            server = TestUtils.createServer(new KafkaConfig(getNewLogDir(ServerProperties)), new MockTime());
            admin = AdminClient.create(ProducerProperties);
        }
        connected += 1;
    }

    @AfterClass
    public static void tearDown() {
        if (connected == 1) {
            try {
                server.shutdown();
                zooKeeper.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }

            zooKeeper = null;
            server = null;
            admin = null;
        }
        connected -= 1;
    }

    @Test
    public void testFail() throws Exception {
        String inputTopic = "input";

        Map<String, String> configs = new HashMap<>();
        int partitions = 1;
        short replication = 1;

        CreateTopicsResult result = admin.createTopics(Arrays.asList(
            new NewTopic(inputTopic, partitions, replication).configs(configs)
        ));
        result.all().get();

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(ProducerProperties, stringSerializer, stringSerializer);
;

        DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(inputTopic));
        for (Map.Entry<String, TopicDescription> topic : topics.all().get().entrySet()) {
            System.out.printf("%s %d\n", topic.getValue().name(), topic.getValue().partitions().size());
            System.out.println(topic.getValue().toString());
        }

        // Some subscription events
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(10).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(100).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1000).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Long.MAX_VALUE, "0", "test message"));
        producer.flush();
        producer.close();

        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        prop.put("group.id", "0");
        prop.put("enable.auto.commit", "true");
        prop.put("auto.commit.interval.ms", "1000");
        prop.put("session.timeout.ms", "30000");
        FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), prop);
        source.assignTimestampsAndWatermarks(
            new WatermarkStrategy<String>() {
                @Override
                public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return new TimestampAssigner<String>() {
                        @Override
                        public long extractTimestamp(String event, long recordTimestamp) {
                            System.out.printf("Assigning timestamp %d\n", recordTimestamp);
                            return recordTimestamp;
                        }
                    };
                }

                @Override
                public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new WatermarkGenerator<String>() {
                        public long latestWatermark = Long.MIN_VALUE;

                        @Override
                        public void onEvent(String event, long timestamp, WatermarkOutput output) {
                            long eventWatermark = timestamp - Time.days(1).toMilliseconds();
                            if (eventWatermark > latestWatermark) {
                                System.out.printf("Emitting watermark %d\n", eventWatermark);
                                output.emitWatermark(new Watermark(eventWatermark));
                                latestWatermark = eventWatermark;
                            }
                        }

                        @Override
                        public void onPeriodicEmit(WatermarkOutput output) {
                        }
                    };
                }
            });
        source.setStartFromEarliest();

        env.addSource(source)
            .process(new Print<String>("Source"));

        System.out.println(env.getExecutionPlan());
        JobClient client = null;
        try {
            client = env.executeAsync("Fail Test");
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }

        topics = admin.describeTopics(Arrays.asList(inputTopic));
        for (Map.Entry<String, TopicDescription> topic : topics.all().get().entrySet()) {
            System.out.printf("%s %d\n", topic.getValue().name(), topic.getValue().partitions().size());
            System.out.println(topic.getValue().toString());
        }

        TimeUnit.SECONDS.sleep(5);
        client.cancel().get(5, TimeUnit.SECONDS);
    }
}