MongoDB sink;

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

MongoDB sink;

Aissa Elaffani
Hello ,
 I want to sink my data to MongoDB but as far as I know there is no sink connector to MongoDB.  How can I implement a MongoDB sink ? If there is any other solutions, I hope you can share with me.
Reply | Threaded
Open this post in threaded view
|

回复:MongoDB sink;

myflink
my solution:
First, Flink sinks data to Kafka;
Second, MongoDB reads data from Kafka. Over.


------------------ 原始邮件 ------------------
发件人: "Aissa Elaffani"<[hidden email]>;
发送时间: 2020年5月6日(星期三) 下午3:17
收件人: "user"<[hidden email]>;
主题: MongoDB sink;

Hello ,
 I want to sink my data to MongoDB but as far as I know there is no sink connector to MongoDB.  How can I implement a MongoDB sink ? If there is any other solutions, I hope you can share with me.
Reply | Threaded
Open this post in threaded view
|

Re: MongoDB sink;

Jingsong Li
Hi,

My impression is that MongoDB's API is not complicated. So you can implement a MongoDB sink. Something like:
@Override
public void invoke(Row value, Context context) throws Exception {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < fieldNames.length; i++) {
map.put(fieldNames[i], row.getField(i));
}
batch.add(new Document(map));
if (batch.size() >= conf.getBatchSize()) {
flush();
}
}
private void flush() {
if (batch.isEmpty()) {
return;
}
MongoDatabase mongoDatabase = client.getDatabase(conf.getDatabase());
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(conf.getCollection());
mongoCollection.insertMany(batch);
batch.clear();
}
Best,
Jingsong Lee

On Wed, May 6, 2020 at 2:42 PM myflink <[hidden email]> wrote:
my solution:
First, Flink sinks data to Kafka;
Second, MongoDB reads data from Kafka. Over.


------------------ 原始邮件 ------------------
发件人: "Aissa Elaffani"<[hidden email]>;
发送时间: 2020年5月6日(星期三) 下午3:17
收件人: "user"<[hidden email]>;
主题: MongoDB sink;

Hello ,
 I want to sink my data to MongoDB but as far as I know there is no sink connector to MongoDB.  How can I implement a MongoDB sink ? If there is any other solutions, I hope you can share with me.


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: MongoDB sink;

rmetzger0
I'm also not aware of a MongoDB sink in Flink. The code provided by Jingsong applies to the "SinkFunction" interface of Flink. That's a good starting point to implement a custom Sink.

On Wed, May 6, 2020 at 9:46 AM Jingsong Li <[hidden email]> wrote:
Hi,

My impression is that MongoDB's API is not complicated. So you can implement a MongoDB sink. Something like:
@Override
public void invoke(Row value, Context context) throws Exception {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < fieldNames.length; i++) {
map.put(fieldNames[i], row.getField(i));
}
batch.add(new Document(map));
if (batch.size() >= conf.getBatchSize()) {
flush();
}
}
private void flush() {
if (batch.isEmpty()) {
return;
}
MongoDatabase mongoDatabase = client.getDatabase(conf.getDatabase());
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(conf.getCollection());
mongoCollection.insertMany(batch);
batch.clear();
}
Best,
Jingsong Lee

On Wed, May 6, 2020 at 2:42 PM myflink <[hidden email]> wrote:
my solution:
First, Flink sinks data to Kafka;
Second, MongoDB reads data from Kafka. Over.


------------------ 原始邮件 ------------------
发件人: "Aissa Elaffani"<[hidden email]>;
发送时间: 2020年5月6日(星期三) 下午3:17
收件人: "user"<[hidden email]>;
主题: MongoDB sink;

Hello ,
 I want to sink my data to MongoDB but as far as I know there is no sink connector to MongoDB.  How can I implement a MongoDB sink ? If there is any other solutions, I hope you can share with me.


--
Best, Jingsong Lee