|
how handles subquery in flink
example
SELECT C_CUSTKEY,C_NAME
FROM Customers where C_MKTSEGMENT=(select C_CUSTKEY,C_MKTSEGMENT from Customers where C_ADDRESS="MG9kdTD2WBHm")
I write code for handles
select C_CUSTKEY,C_MKTSEGMENT from Customers where C_ADDRESS="MG9kdTD2WBHm"
but how can handles first part with result from second part
code for handles scond part
getCustomerDataSetsubquery(env,mask1,count1);
env.execute("excute of sub query");
private static void getCustomerDataSetsubquery(ExecutionEnvironment env,String mask,int count) {
DataSet<Customer3> customers=env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
.fieldDelimiter('|')
.includeFields(mask).ignoreFirstLine()
.tupleType(Customer3.class);
customers = customers.filter(new FilterFunction<Customer3>()
{
@Override
public boolean filter(Customer3 c) {
String c1= columnswhere_sub_query.get(0).toString();
int index1=Integer.parseInt(map1.get(c1).toString());
return c.getField(index1).toString().equals(values1_sub_query.get(0).toString());
}
});
customers.print();
customers.writeAsCsv("/home/hadoop/Desktop/Dataset/out1.csv", "\n", "|");
}
|