Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

Federico D'Ambrosio
Hello everyone,

I'd like to use the HiveBolt from storm-hive inside a flink job using the Flink-Storm compatibility layer but I'm not sure how to integrate it. Let me explain, I would have the following:

val mapper = ...

val hiveOptions = ...

streamByID
  .transform[OUT]("hive-sink", new BoltWrapper[IN, OUT](new HiveBolt(hiveOptions)))

where streamByID is a DataStream[Event].

What would be the IN and OUT types? HiveBolt executes on a storm Tuple, so, I'd think that In should be an Event "tuple-d" ( event => (field1, field2, field3 ...) ), while OUT, since I don't want the stream to keep flowing would be null or None?

Alternatively, do you know any implementation of an hive sink in Flink? Other than the adaptation of the said HiveBolt in a RichSinkFunction?

Thanks for your attention,
 Federico
Reply | Threaded
Open this post in threaded view
|

Re: Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

Nico Kruber
Hi Federico,
I also did not find any implementation of a hive sink, nor much details on this
topic in general. Let me forward this to Timo and Fabian (cc'd) who may know
more.

Nico

On Friday, 22 September 2017 12:14:32 CEST Federico D'Ambrosio wrote:

> Hello everyone,
>
> I'd like to use the HiveBolt from storm-hive inside a flink job using the
> Flink-Storm compatibility layer but I'm not sure how to integrate it. Let
> me explain, I would have the following:
>
> val mapper = ...
>
> val hiveOptions = ...
>
> streamByID
>   .transform[OUT]("hive-sink", new BoltWrapper[IN, OUT](new
> HiveBolt(hiveOptions)))
>
> where streamByID is a DataStream[Event].
>
> What would be the IN and OUT types? HiveBolt executes on a storm Tuple, so,
> I'd think that In should be an Event "tuple-d" ( event => (field1, field2,
> field3 ...) ), while OUT, since I don't want the stream to keep flowing
> would be null or None?
>
> Alternatively, do you know any implementation of an hive sink in Flink?
> Other than the adaptation of the said HiveBolt in a RichSinkFunction?
>
> Thanks for your attention,
>  Federico


Reply | Threaded
Open this post in threaded view
|

Re: Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

Timo Walther
Hi Federico,

I think going through a Storm compatibility layer could work, but did
you thought about using the flink-jdbc connector? That should be the
easiest solution.

Otherwise I think it would be easier to quickly implement your our
SinkFunction. It is just one method that you have to implement, you
could call some Hive commands there.

Regards,
Timo


Am 9/25/17 um 4:16 PM schrieb Nico Kruber:

> Hi Federico,
> I also did not find any implementation of a hive sink, nor much details on this
> topic in general. Let me forward this to Timo and Fabian (cc'd) who may know
> more.
>
> Nico
>
> On Friday, 22 September 2017 12:14:32 CEST Federico D'Ambrosio wrote:
>> Hello everyone,
>>
>> I'd like to use the HiveBolt from storm-hive inside a flink job using the
>> Flink-Storm compatibility layer but I'm not sure how to integrate it. Let
>> me explain, I would have the following:
>>
>> val mapper = ...
>>
>> val hiveOptions = ...
>>
>> streamByID
>>    .transform[OUT]("hive-sink", new BoltWrapper[IN, OUT](new
>> HiveBolt(hiveOptions)))
>>
>> where streamByID is a DataStream[Event].
>>
>> What would be the IN and OUT types? HiveBolt executes on a storm Tuple, so,
>> I'd think that In should be an Event "tuple-d" ( event => (field1, field2,
>> field3 ...) ), while OUT, since I don't want the stream to keep flowing
>> would be null or None?
>>
>> Alternatively, do you know any implementation of an hive sink in Flink?
>> Other than the adaptation of the said HiveBolt in a RichSinkFunction?
>>
>> Thanks for your attention,
>>   Federico


Reply | Threaded
Open this post in threaded view
|

Re: Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

Federico D'Ambrosio-2
Hi Timo,

I didn't think about the jdbc-connector (I actually forgot that this was a thing) and I'll surely look into it.
So far, I was trying to implement a simple sink (only for JSON so far) starting from the base provided by the storm HiveBolt implementation, my goal was to make use of Hive Streaming API.
I noticed that there are some potentially blocking calls in Hive API, for example when a TransactionBatch is being committed or the StreamingConnection is being closed, in your opinion what would be the best way to deal with this kind of calls in Flink? Wrapping them in an AsyncFunction? Simply spawning a new thread?

Kind regards,
Federico

2017-09-25 16:43 GMT+02:00 Timo Walther <[hidden email]>:
Hi Federico,

I think going through a Storm compatibility layer could work, but did you thought about using the flink-jdbc connector? That should be the easiest solution.

Otherwise I think it would be easier to quickly implement your our SinkFunction. It is just one method that you have to implement, you could call some Hive commands there.

Regards,
Timo


Am 9/25/17 um 4:16 PM schrieb Nico Kruber:

Hi Federico,
I also did not find any implementation of a hive sink, nor much details on this
topic in general. Let me forward this to Timo and Fabian (cc'd) who may know
more.

Nico

On Friday, 22 September 2017 12:14:32 CEST Federico D'Ambrosio wrote:
Hello everyone,

I'd like to use the HiveBolt from storm-hive inside a flink job using the
Flink-Storm compatibility layer but I'm not sure how to integrate it. Let
me explain, I would have the following:

val mapper = ...

val hiveOptions = ...

streamByID
   .transform[OUT]("hive-sink", new BoltWrapper[IN, OUT](new
HiveBolt(hiveOptions)))

where streamByID is a DataStream[Event].

What would be the IN and OUT types? HiveBolt executes on a storm Tuple, so,
I'd think that In should be an Event "tuple-d" ( event => (field1, field2,
field3 ...) ), while OUT, since I don't want the stream to keep flowing
would be null or None?

Alternatively, do you know any implementation of an hive sink in Flink?
Other than the adaptation of the said HiveBolt in a RichSinkFunction?

Thanks for your attention,
  Federico





--
Federico D'Ambrosio