git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Creating dynamic pool from task


What I am doing is very similar. However I am including the DagRun's id in the pool name to make it unique, as I need to make sure every run gets its own pool. I am getting that from the context object, which is only available within execute methods or templates. How do you make sure each run has it's own pool?


Thanks,

Dávid Szakállas
Software Engineer | Whitepages Data Services

________________________________
From: Taylor Edmiston <tedmiston@xxxxxxxxx>
Sent: Thursday, September 20, 2018 6:17:05 PM
To: dev@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
Subject: Re: Creating dynamic pool from task

I've done something similar.  I have a task at the front of the DAG that
ensures the connection pool exists and creates the pool if it doesn't.
I've pasted my code below.  This runs in a for loop that creates one DAG
per iteration each with its own pool.  Then I pass the pool name into the
sensors.

Does this work for your use case?

--

redshift_pool = PythonOperator(
    task_id='redshift_pool',
    dag=dag,
    python_callable=ensure_redshift_pool,
    op_kwargs={
        'name': workflow.pool,
        'slots': REDSHIFT_POOL_SLOTS,
    },
    ...
)

@provide_session
def ensure_redshift_pool(name, slots, session=None):
    pool = Pool(pool=name, slots=slots)
    pool_query = (
        session.query(Pool)
        .filter(Pool.pool == name)
    )
    pool_query_result = pool_query.one_or_none()
    if not pool_query_result:
        logger.info(f'redshift pool "{name}" does not exist - creating it')
        session.add(pool)
        session.commit()
        logger.info(f'created redshift pool "{name}"')
    else:
        logger.info(f'redshift pool "{name}" already exists')

--

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
<https://stackoverflow.com/story/taylor>



On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <dszakallas@xxxxxxxxxxxxxx>
wrote:

> Hi all,
>
> I have a DAG that creates a cluster, starts computation tasks, and after
> they completed, tears down the cluster. I want to limit concurrency for the
> computation tasks carried on this cluster to fixed number. So logically, I
> need a pool that is exclusive to the cluster created by a task. I don't
> want interference with other DAGs or different runs of the same DAG.
>
> I thought I could solve this problem by creating a pool dynamically from a
> task after the cluster is created and delete it once the computation tasks
> are finished. I thought I could template the pool parameter of the
> computation tasks to make them use this dynamically created cluster.
>
> However this way the computation tasks will never be triggered. So I think
> the pool parameter is saved in the task instance before being templated. I
> would like to hear your thoughts on how to achieve the desired behavior.
>
> Thanks,
>
> Dávid Szakállas
> Software Engineer | Whitepages Data Services
>
>
>
>
>