Hi,
I am reading messages off a Kafka Topic and want to process the messages through Flink and save them into S3. It was pointed out to me that stream processing of the Kafka data won't be saved to S3 because S3 doesn't allow data to be appended to a file, so I want to convert the Kafka stream into batches and save them to S3. Based on other user questions/answers, it looks like this is possible using windowing by breaking the stream into batches and creating files. I have written the following code, but it doesn't work and I am not getting any errors either. I have a sys.out that shows the tuple is being processed, but it might not be emitted in the out.collect. Can someone help me figure out what may be the issue? Thanks! public class S3Sink { public static void main(String[] args) throws Exception { Map<String, String> configs = ConfigUtils.loadConfigs("/Users/path/to/configs.yaml");
final ParameterTool parameterTool = ParameterTool.fromMap(configs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setGlobalJobParameters(parameterTool);
DataStream messageStream = env .addSource(new FlinkKafkaConsumer09<String>(parameterTool.get("kafka.topic"), new SimpleStringSchema(), parameterTool.getProperties()));
String uuid = UUID.randomUUID().toString();
DataStreamSink tuple2DataStream = messageStream .flatMap(new Tupler()) .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .apply(new MyWindowFunction()) .writeAsText("s3://flink-test/flink-output-stream/"+ uuid + "testdoc.txt"); env.execute(); }
private static class Tupler implements FlatMapFunction<String, Tuple2<String, String>> { @Override public void flatMap(String record, Collector<Tuple2<String, String>> out) throws Exception { out.collect(new Tuple2<String, String>("record",record)); } }
private static class MyWindowFunction implements WindowFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple, TimeWindow>{
@Override public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, String>> input, Collector<Tuple2<String, String>> out) throws Exception { for (Tuple2<String, String> in: input){ System.out.println(in); out.collect(in); } } } } Thanks, Sam |
Can you check the log files of the TaskManagers and JobManager?
There is no obvious reason that the collection should not work. On another note: the rolling file sink might be what you are looking for. https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/filesystem_sink.html On Fri, Jan 13, 2017 at 5:48 PM, Samra Kasim <[hidden email]> wrote: > Hi, > > I am reading messages off a Kafka Topic and want to process the messages > through Flink and save them into S3. It was pointed out to me that stream > processing of the Kafka data won't be saved to S3 because S3 doesn't allow > data to be appended to a file, so I want to convert the Kafka stream into > batches and save them to S3. Based on other user questions/answers, it looks > like this is possible using windowing by breaking the stream into batches > and creating files. I have written the following code, but it doesn't work > and I am not getting any errors either. I have a sys.out that shows the > tuple is being processed, but it might not be emitted in the out.collect. > Can someone help me figure out what may be the issue? Thanks! > > public class S3Sink { > > public static void main(String[] args) throws Exception { > > Map<String, String> configs = > ConfigUtils.loadConfigs("/Users/path/to/configs.yaml"); > > > > final ParameterTool parameterTool = ParameterTool.fromMap(configs); > > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.getConfig().disableSysoutLogging(); > > env.getConfig().setGlobalJobParameters(parameterTool); > > > > DataStream messageStream = env > > .addSource(new > FlinkKafkaConsumer09<String>(parameterTool.get("kafka.topic"), > > new SimpleStringSchema(), > > parameterTool.getProperties())); > > > > String uuid = UUID.randomUUID().toString(); > > > > DataStreamSink tuple2DataStream = messageStream > > .flatMap(new Tupler()) > > .keyBy(0) > > .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) > > .apply(new MyWindowFunction()) > > .writeAsText("s3://flink-test/flink-output-stream/"+ uuid + > "testdoc.txt"); > > > env.execute(); > > } > > > > private static class Tupler implements FlatMapFunction<String, > Tuple2<String, String>> { > > @Override > > public void flatMap(String record, Collector<Tuple2<String, String>> > out) throws Exception { > > out.collect(new Tuple2<String, String>("record",record)); > > } > > } > > > > private static class MyWindowFunction implements > WindowFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple, > TimeWindow>{ > > > > @Override > > public void apply(Tuple key, TimeWindow timeWindow, > Iterable<Tuple2<String, String>> input, > > Collector<Tuple2<String, String>> out) throws > Exception { > > for (Tuple2<String, String> in: input){ > > System.out.println(in); > > out.collect(in); > > } > > } > > } > > } > > > -- > > Thanks, > > Sam |
Free forum by Nabble | Edit this page |