[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Lineage

Hi Gerardo,

Any lineage tracking system is dependent on how much data you can give it. So if you do transfers outside of the 'view' such a system has then lineage information is gone. Airflow can help in this area by tracking its internal lineage and providing that to those lineage systems. 

Apache Atlas is agnostic and can receive lineage info by rest API (used in my implementation) and Kafk topic. It does also come with a lot of connectors out of the box that tie into the hadoop ecosystem and make your live easier there. The Airflow Atlas connector supplies Atlas with information that it doesn't know about yet closing the loop further. 

Also you can write your own connector and put it on the Airflow class path and use that one. 


Sent from my iPhone

> On 6 May 2018, at 09:13, Gerardo Curiel <gerardo@xxxxxxxx> wrote:
> Hi Bolke,
> Data lineage support sounds very interesting.
> I'm not very familiar with Atlas but first sight seems like a tool specific
> to the Hadoop ecosystem. How would this look like if the files (inlets or
> outlets) were stored on s3?.
> An example of a service that manages a similar use case is AWS Glue[1],
> which creates a hive metastore based on the schema and other metadata it
> can get from different sources (amongst them, s3 files).
>> On Sun, May 6, 2018 at 7:49 AM, Bolke de Bruin <bdbruin@xxxxxxxxx> wrote:
>> Hi All,
>> I have made a first implementation that allows tracking of lineage in
>> Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s
>> work in the past on Data Flow pipelines, but I think I kept it a little bit
>> simpler.
>> Operators now have two new parameters called “inlets” and “outlets”. These
>> can be filled with objects derived from “DataSet”, like “File” and
>> “HadoopFile”. Parameters are jinja2 templated, which
>> means they receive the context of the task when it is running and get
>> rendered. So you can get definitions like this:
>> f_final = File(name="/tmp/final")
>> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
>>    inlets={"auto": True},
>>    outlets={"datasets": [f_final,]})
>> f_in = File(name="/tmp/whole_directory/")
>> outlets = []
>> for file in FILE_CATEGORIES:
>>    f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
>>    outlets.append(f_out)
>> run_this = BashOperator(
>>    task_id='run_after_loop', bash_command='echo 1', dag=dag,
>>    inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
>>    outlets={"datasets": outlets}
>>    )
>> run_this.set_downstream(run_this_last)
>> So I am trying to keep to boilerplate work down for developers. Operators
>> can also extend inlets and outlets automatically. This will probably be a
>> bit harder for the BashOperator without some special magic, but an update
>> to the DruidOperator can be relatively quite straightforward.
>> In the future Operators can take advantage of the inlet/outlet definitions
>> as they are also made available as part of the context for templating (as
>> “inlets” and “outlets”).
>> I’m looking forward to your comments!
>> Bolke.
> [1]
> Cheers,
> -- 
> Gerardo Curiel //