git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: S3keysonsor


The sensor allows wild card (*) and there is also an S3PrefixSensor which
might help in some cases. In one of my dags, I have a similar structure.

wait_on_s3_source_data = S3KeySensor(
    task_id='wait_on_s3_source_data',
    bucket_key="s3://mybucket_name/data_file_name_{{macros.ds_format(macros.ds_add(ds,1),'%Y-%m-%d','%Y%m%d')}}*/_SUCCESS",
    wildcard_match=True,
    timeout=60*15,
    retries=4,
    dag=dag)

I have files like the below in my bucket. The date part is fixed/expected
to be exactly a match based on ds, the time part (after 8 digits) is
different

data_file_name_20170912181034/done_marker.csv
data_file_name_20170913181035/done_marker.csv
data_file_name_20170914181027/done_marker.csv
data_file_name_20170915181033/done_marker.csv

If you can receieve a file with *any date* and you need to process it, you
might need to just use the wild card on the whole bucket and then process
the file based on the name or the data inside. May be a custom python
function to archive the file after you process it as well.

On Mon, May 21, 2018 at 2:59 PM purna pradeep <purna2pradeep@xxxxxxxxx>
wrote:

> + Joe
>
>
>
> On Mon, May 21, 2018 at 2:56 PM purna pradeep <purna2pradeep@xxxxxxxxx>
> wrote:
>
> > I do know only to some extent , I mean If you see my sample s3 locations
> >
> > s3a://mybucket/20180425_111447_data1/_SUCCESS
> >
> > s3a://mybucket/20180424_111241_data1/_SUCCESS
> >
> >
> >
> > The only values which are static in above location are
> >
> > s3a://mybucket/
> >
> > data1/_SUCCESS
> >
> > Now I want to configure tolerance for _SUCCESS file as latest or 1 day
> > older based on this configuration it should pick the right time stamp
> > folder which has _SUCCESS file
> >
> > On Mon, May 21, 2018 at 2:35 PM Joe Napolitano <
> joe.napolitano@xxxxxxxxxx>
> > wrote:
> >
> >> Purna, with regards to "this path is not completely static," can you
> >> clarify what you mean?
> >>
> >> Do you mean that you don't know the actual key name beforehand? E.g.
> >> pertaining to "111447", "111241", and "111035" in your example?
> >>
> >> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
> >> brian@xxxxxxxxxxxxxxxxxxxxxxxxx> wrote:
> >>
> >> > I suggest it’ll work for your needs.
> >> >
> >> > Sent from a device with less than stellar autocorrect
> >> >
> >> > > On May 21, 2018, at 10:16 AM, purna pradeep <
> purna2pradeep@xxxxxxxxx>
> >> > wrote:
> >> > >
> >> > > Hi ,
> >> > >
> >> > > I’m trying to evaluate airflow to see if it suits my needs.
> >> > >
> >> > > Basically i can have below steps in a DAG
> >> > >
> >> > >
> >> > >
> >> > > 1)Look for a file arrival on given s3 location (this path is not
> >> > completely
> >> > > static) (i can use S3Keysensor in this step)
> >> > >
> >> > >  i should be able to specify to look either for latest folder or
> >> 24hrs or
> >> > > n number of days older folder which has _SUCCESS file as mentioned
> >> below
> >> > >
> >> > >  sample file location(s):
> >> > >
> >> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
> >> > >
> >> > >
> >
> >
> > s3a://mybucket/20180424_111241_data1/_SUCCESS
> >> > >
> >> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
> >> > >
> >> > >
> >> > >
> >> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
> >> > > dependency is met ,i can set upstream for step2 as step1
> >> > >
> >> > >
> >> > >
> >> > > Does S3keysensor supports step1 out of the box?
> >> > >
> >> > > Also in some cases i may to have a DAG without start date & end date
> >> it
> >> > > just needs to be triggered once file is available in a given s3
> >> location
> >> > >
> >> > >
> >> > >
> >> > > *Please suggest !*
> >> >
> >>
> >
>


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg03422.html on line 191
Call Stack
#TimeMemoryFunctionLocation
10.0011358472{main}( ).../msg03422.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg03422.html on line 191
Call Stack
#TimeMemoryFunctionLocation
10.0011358472{main}( ).../msg03422.html:0