Hi, I'm trying to evaluate Flink to see if it can do efficient semi-joins or self-joins with filter. Problem description:I have 1 stream that can contain "near duplicates" records. The records share a "family name" and so, many records can have the same family name. But each record has a unique id too. Query: 1) I would like to run a query such that I get the ids and names of some records based on a criterion Question 1:2) I then want to fetch all the records that match this set of ids fetched previously but this time include the duplicates that share the same name Question 2: I haven't seen many examples of non-Hadoop connectors. Is there any plan for those? (If you are curious, I was looking at Presto and had to post the question there also - https://groups.google.com/forum/#!topic/presto-users/Ns0q4pvHwfo) public class JoinExample { |
Hi! I am not 100% sure that I understand your question completely, but I'll give it my best shot. If you want to push IDs into the connector, I assume you mean that you use some form of connector that can filter by ID directly in the low level data access paths, in order to read as little data as possible. In some sense, what databases do with an indexed fetch. I think there are two ways to go about this: (1) If the set of IDs is very small, just collect it and make it part of the second access' parameters: List<Integer> ids = newerRecords.collect(); env.createInput(new MyFilteringParquetFormat(tableSpec, ids)); // this lets parquet search for the IDs. (2) If the set is not so small, you can custom-partition it and run a custom fetch code. Something like this: DataSet<Integer> ids = ... // your code to get the IDs ids .partitionCustom(new MyPartitionerThatIsAwareOfTheStoragePartitioning()) .mapPartition(new RichMapPartitionFunction<Integer, Record>() { mapPartition(Iterable<Integer> values, Collector<Record> out) { List<Integer> ids = collect(values); int partition = getRuntimeContext().getIndexOfThisSubtask(); Reader readerForPartiton = new MyReader(partition); for (Record r : readerForPartiton.query(ids)) { out.collect(r); } } }) .print(); Hope that helps! - HBase - JDBC - Java Collections / Iterators - HCatalog - Kafka (streaming) - RabbitMQ (streaming) - Flume (streaming) - Also, all Hadoop connectors can be used (via Hadoop compatibility functions) giving you access to Hadoop's MongoDB, Cassandra, etc connectors. Was that what you were referring to, or did I misunderstand your question? Some other comments concerning your code: - If you make your type "Record" a POJO (public constructor, public fields or public accessors), then you can use field names for the keys. - Tuples are still the fastest type in Flink, so if you care about performance big time, make your "Record" class a subclass of Tuple2, and use tuple positions as keys Greetings, Stephan On Mon, Aug 17, 2015 at 3:30 PM, Ashwin Jayaprakash <[hidden email]> wrote:
|
In reply to this post by Ashwin Jayaprakash
Stephan, this is exactly what I was looking for :) Thanks, will try it out. I know the combineGroup() needed a reduceGroup() too, but I was just trying out the APIs.On Mon, Aug 17, 2015 at 6:30 AM, Ashwin Jayaprakash <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |