Hi,
I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime. I have couple of questions for the same; 1. Is the approach fine for merging topics and creating a single JSON? 2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated. Code Snippet :
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); logger.info("Flink Stream Window Charger has started"); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:1030"); properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka"); properties.setProperty("group.id", "group-0011"); properties.setProperty("auto.offset.reset", "smallest"); List < String > names = new ArrayList < > (); names.add("Topic-A"); names.add("Topic-B"); DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties)); DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime()); List < String > where = new ArrayList < String > (); AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2); DataStream < String > data_charging = data_window.apply(new MyWindowFunction()); data_charging.addSink(new SinkFunction < String > () { public void invoke(String value) throws Exception { // Yet to be implemented - Merge two POJO into one } }); try { env.execute(); } catch (Exception e) { return; } } } class Tokenizer implements FlatMapFunction < TopicPojo, String > { private static final long serialVersionUID = 1 L; @Override public void flatMap(TopicPojo value, Collector < String > out) throws Exception { ObjectMapper mapper = new ObjectMapper(); out.collect(mapper.writeValueAsString(value)); } } class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > { @Override public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out) throws Exception { int count = 0; for (TopicPojo in : arg2) { count++; } // Test Result - TO be modified out.collect("Window: " + window + "count: " + count); } } class Deserializer implements MapFunction < String, TopicPojo > { private static final long serialVersionUID = 1 L; @Override public TopicPojo map(String value) throws IOException { // TODO Auto-generated method stub ObjectMapper mapper = new ObjectMapper(); TopicPojo obj = null; try { System.out.println(value); obj = mapper.readValue(value, TopicPojo.class); } catch (JsonParseException e) { // TODO Auto-generated catch block throw new IOException("Failed to deserialize JSON object."); } catch (JsonMappingException e) { // TODO Auto-generated catch block throw new IOException("Failed to deserialize JSON object."); } catch (IOException e) { // TODO Auto-generated catch block throw new IOException("Failed to deserialize JSON object."); } return obj; } } I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error. Kindly give your input. Regards, Vijay Raajaa GS |
Hi,
An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to: inputStream .keyBy(…) .window(…) .apply(…) // or reduce() In your case, you key the stream and then the keying is “lost” again because you apply a flatMap(). That’s why you have an all-window and not a keyed window. Best, Aljoscha
|
Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach followed for merging topics and creating a single JSON? Regards, Vijay Raajaa G S On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <[hidden email]> wrote:
|
The approach could work, but if it can happen that an event from stream A is not matched by an event in stream B you will have lingering state that never goes away. For such cases it might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html.
The idea is to keep events from each side in state and emit a result when you get the event from the other side. You also set a cleanup timer in case no other event arrives to make sure that state eventually goes away. Best, Aljoscha
|
Thanks for your input, will try to incorporate them in my implementation. Regards, Vijay Raajaa G S On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <[hidden email]> wrote:
|
I tried to reorder and the window function works fine. but then after processing few stream of data from Topic A and Topic B, the window function seem to throw the below error. The keyby is on eventTime field. java.lang.RuntimeException: Unexpected key group index. This indicates a bug. at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57) at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Regards, Vijay Raajaa GS On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <[hidden email]> wrote:
|
What’s the KeySelector you’re using? To me, this indicates that the timestamp field is somehow changing after the original keying or in transit.
Best. Aljoscha
|
I tried the timestamp field as a string datatype as well as a Date object. Getting same error in both the cases; Please find the POJO file: import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import com.fasterxml.jackson.annotation.JsonAnyGetter; import com.fasterxml.jackson.annotation.JsonAnySetter; import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import org.apache.commons.lang.builder.ToStringBuilder; @JsonPropertyOrder({ "data", "label", "eventTime" }) public class TopicPojo { @JsonProperty("data") private List<List<Double>> data = null; @JsonProperty("label") private List<String> label = null; @JsonProperty("eventTime") private static Date eventTime; /** * No args constructor for use in serialization * */ public TopicPojo() { } /** * * @param data * @param label * @param eventTime */ public SammonsPojo(List<List<Double>> data, List<String> label, Date eventTime) { super(); this.data = data; this.label = label; this.eventTime = eventTime; } @JsonProperty("data") public List<List<Double>> getData() { return data; } @JsonProperty("data") public void setData(List<List<Double>> data) { this.data = data; } @JsonProperty("label") public List<String> getLabel() { return label; } @JsonProperty("label") public void setLabel(List<String> label) { this.label = label; } @JsonProperty("eventTime") public static Date getEventTime() { return eventTime; } @JsonProperty("eventTime") public void setEventTime(Date eventTime) { this.eventTime = eventTime; } @Override public String toString() { return ToStringBuilder.reflectionToString(this); } } The above code pertains to eventTime as Date object , tried them as String as well. Regards, Vijay Raajaa G S On Fri, May 5, 2017 at 1:59 PM, Aljoscha Krettek <[hidden email]> wrote:
|
It seems that eventTime is a static field in TopicPojo and the key selector also just gets the static field via TopicPojo.getEventTime(). Why is that? Because with this the event time basically has nothing to do with the data.
|
In reply to this post by G.S.Vijay Raajaa
Hello
I seem to be hitting this issue - I checked my keyselector and gettimestamp method on Watermark - they look correct. The task runs fine with parallel 1 , but throws the same error for > 1. My order for processing - I create a keyed stream with keyselector on the stream and create a window operator on top of this keyed transformation. Thanks meera |
In reply to this post by G.S.Vijay Raajaa
Did this problem get resolved
- I am running into this problem when I parallelize the tasks Unexpected key group index. This indicates a bug. - it runs fine on 1 parallelism. This suggests there is some key grouping issue - I checked my Watermark and KeySelector - they look okay. The snippet of my KeySelector and Watermark attached to the KeyedStream. public class DimensionKeySelector<T extends SignalSet<?>> implements KeySelector<T, String> { private static final long serialVersionUID = 7666263008141606451L; private final String[] dimKeys; public DimensionKeySelector(Map<String, String> conf) { if (conf.containsKey("dimKeys") == false) { throw new RuntimeException("Required 'dimKeys' missing."); } this.dimKeys = conf.get("dimKeys").split(","); } @Override public String getKey(T signalSet) throws Exception { StringBuffer group = new StringBuffer(signalSet.namespace()); if (signalSet.size() != 0) { for (String dim : dimKeys) { if (signalSet.dimensions().containsKey(dim)) { group.append(signalSet.dimensions().get(dim)); } } } return group.toString(); } } and Watermark public class WaterMarks extends BoundedOutOfOrdernessTimestampExtractor<MetricSignalSet> { public WaterMarks(Time maxOutOfOrderness) { super(maxOutOfOrderness); } private static final long serialVersionUID = 1L; @Override public long extractTimestamp(MetricSignalSet element) { return element.get(0).timestamp().getTime(); } } Any thoughts? |
Hi,
As a simple test, you can put your key extraction logic into a MapFunction, i.e. MapFunction<T extends SignalSet<?>, Tuple2<String, T>> and then simply use that field as the key: input .map(new MyKeyExtractorMapper()) .keyBy(0) If that solves your problem it means that the key extraction is not deterministic. This is a problem because getKey() is called at different points in time and when the result is not always the same you will get that error. Best, Aljoscha > On 12. Jun 2017, at 22:04, Meera <[hidden email]> wrote: > > Did this problem get resolved > > - I am running into this problem when I parallelize the tasks > Unexpected key group index. This indicates a bug. > > - it runs fine on 1 parallelism. This suggests there is some key grouping > issue - I checked my Watermark and KeySelector - they look okay. > > The snippet of my KeySelector and Watermark attached to the KeyedStream. > public class DimensionKeySelector<T extends SignalSet<?>> implements > KeySelector<T, String> { > > private static final long serialVersionUID = 7666263008141606451L; > private final String[] dimKeys; > > public DimensionKeySelector(Map<String, String> conf) { > if (conf.containsKey("dimKeys") == false) { > throw new RuntimeException("Required 'dimKeys' missing."); > } > this.dimKeys = conf.get("dimKeys").split(","); > } > > @Override > public String getKey(T signalSet) throws Exception { > StringBuffer group = new StringBuffer(signalSet.namespace()); > if (signalSet.size() != 0) { > for (String dim : dimKeys) { > if (signalSet.dimensions().containsKey(dim)) { > group.append(signalSet.dimensions().get(dim)); > } > } > } > return group.toString(); > } > } > > and Watermark > public class WaterMarks extends > BoundedOutOfOrdernessTimestampExtractor<MetricSignalSet> { > > public WaterMarks(Time maxOutOfOrderness) { > super(maxOutOfOrderness); > } > > private static final long serialVersionUID = 1L; > > @Override > public long extractTimestamp(MetricSignalSet element) { > return element.get(0).timestamp().getTime(); > } > } > > Any thoughts? > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-Function-on-AllWindowed-Stream-Combining-Kafka-Topics-tp12941p13663.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
We couldn't put the map phase in between working with stream transformation classes and it created a dangling Mapper - but doing partitioner/tranformation with the window operator worked.
WindowOperator operator = ... KeyGroupStreamPartitioner<MetricSignalSet, String> partitioner = new KeyGroupStreamPartitioner<MetricSignalSet, String>(new DimensionKeySelector<MetricSignalSet>(config), parallel); PartitionTransformation<MetricSignalSet> partitioned = new PartitionTransformation<MetricSignalSet> (inputs, partitioner) ; OneInputTransformation<MetricSignalSet, MetricSignalSet> trans = new OneInputTransformation<MetricSignalSet, MetricSignalSet>( partitioned, name, operator, ess, parallel); trans.setStateKeySelector(new DimensionKeySelector<MetricSignalSet>(config)); trans.setStateKeyType(new GenericTypeInfo<String>(String.class)); |
Free forum by Nabble | Edit this page |