Tracing and Flink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Tracing and Flink

Aaron Levin
Hello Flink Friends!

This is a long-shot, but I'm wondering if anyone is thinking or working on applying tracing to Streaming systems and in particular Flink. As far as I understand this is a fairly open problem and so I'm curious how folks are thinking about it and if anyone has considered how they might apply tracing to Flink systems.

Some patterns in Streaming systems fit into tracing fairly easily (consumer fanout-out, for example) but many patterns do not. For example, how do you trace when there is batching or aggregations? Nevertheless, I'm sure some folks have thought about this or even tried to implement solutions, and so I'd love to hear about this. Especially if there are any standards work in this direction (for example, within the OpenTracing project).

If you've thought about this, implemented something, or are working on standards related to this, I'd love to hear from you! Thank you!

Best,

Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: Tracing and Flink

Arvid Heise-3
Hi Aaron,

I'm not too sure about tracing and Flink. It's the first time I heard about it in this context and I'm not immediately seeing the benefit of it.

What is imho more interesting and a well-formed discipline in the science of data quality is a concept called data lineage. [1]

I can go quite deep into this topic, but I just skim over the most important points.

* Ideally, the data processing framework would do the data lineage for you. Flink doesn't (like none of the obvious competitors afaik, I only know research prototypes that do) so you have to add additional information to each record (blow up of record size) or store the lineage information in an external system (which will make the whole setup usually bottlenecked at the external system).
* It's worth adding some kind of header to all of your record types with the same structure. Your best bet is using a format like Avro where you can reference an external schema.
* You usually want to have a UID on each record as it enters the system. The best thing is a long id from a database, but you might also resort to generated UUID (byte form with limited debuggability) or string (adds up to 36 bytes per record).
* Each 1:1 transformation or filtering can retain this UID, but it also makes sense to generate a new one.
* Each 1:n fan-out transformate, generate new UID.
* Each aggregation, generate new UID.
* When new UID, add UIDs from all input records in some kind of origins/sources field as an array.
* If the pipeline uses temporary records where you would need to trace different origin UIDs (two aggregations in the same pipeline), you want to have multiple layers of origins/sources in the form of a two-dimensional array or map of arrays.
* If you want to keep the lineage across different pipelines, add producer name and version to the final records (version is really useful for monkey-patching errors anyways).
* If you want to trace latency across pipelines, add original timestamps.

As you can guess, for smaller records, the header may easily be larger than the actual message. Thus, if it's just for debugging, I'd add some options to the pipeline to skip header processing/generation. If it's for auditing purposes, you probably have to live with it.

One nice alternative that is possible in Flink, is to work with auxillary records going out in secondary outputs. So instead of embedding the header in the record, generate it as a different record going into secondary storages. Of course, that still requires all records to have UIDs.

Let me know if I misunderstood your original question or if you want to delve deeper.


On Sat, Aug 15, 2020 at 12:26 AM Aaron Levin <[hidden email]> wrote:
Hello Flink Friends!

This is a long-shot, but I'm wondering if anyone is thinking or working on applying tracing to Streaming systems and in particular Flink. As far as I understand this is a fairly open problem and so I'm curious how folks are thinking about it and if anyone has considered how they might apply tracing to Flink systems.

Some patterns in Streaming systems fit into tracing fairly easily (consumer fanout-out, for example) but many patterns do not. For example, how do you trace when there is batching or aggregations? Nevertheless, I'm sure some folks have thought about this or even tried to implement solutions, and so I'd love to hear about this. Especially if there are any standards work in this direction (for example, within the OpenTracing project).

If you've thought about this, implemented something, or are working on standards related to this, I'd love to hear from you! Thank you!

Best,

Aaron Levin


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Tracing and Flink

bvarga
Hi Aaron,

I've recently been looking at this topic and working on a prototype. The
approach I am trying is "backward tracing", or data provenance tracing,
where we try to explain what inputs and steps have affected the production
of an output record.

Arvid has summarized the most important aspects, my approach to UIDs is as
he described. I would like to add a few thoughts.

- With this backward tracing approach, it is very difficult to do sampling,
as aggregations / multi-input operators can only be traced if all inputs are
also traced. So this is more useful if you need to be able to explain the
origins of all output records.

- As Arvid mentioned, the size of the trace records can become big, and
negatively impact the performance of the pipeline. I'd suggest an approach
where each operator directly outputs its traces to some storage. Each trace
record has a UID. If each trace record contains a list/array of its inputs,
and you use an appropriate storage, you can do recursive lookups based on
the trace UIDs to find a complete trace graph for an output record. You may
even want a separate Flink job that pre-processes and pre-aggregates traces
that belong together (although the lateness / ordering might be difficult to
handle)

- If you choose this directly reporting approach, you still need to pass
along the trace UID in the main pipeline, so that the next operator's
produced trace can list it in the inputs.

- If you leave the production of the trace records explicit (as in you have
to construct and collect the trace record manually in each operator), you
can flexibly choose what inputs to include (e.g. for a large aggregation,
you may only want to list some of the aggregated elements as inputs). You
can then also add any additional metadata to help explain a certain step.

- I've looked into adapting this to OpenTracing, but it didn't seem
well-suited for this task. The span-based approach has a parent-child
relationship that doesn't fit the dataflow model too well. In Flink, with
the backward-tracing approach, the "root span" would logically be the output
record, and its children would need to be constructed earlier. I couldn't
find a way to nicely fit this view into the structure of OpenTracing
records.

Let me know your thoughts, I'd be happy to discuss this further.

Regards,

Balazs Varga  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/