git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Creating dynamic pool from task


Chris, the tasks are independent of each other so they can run concurrently. I have to limit the concurrency though, so they don’t starve. As the cluster is created dynamically with a task, a shared pool with other DAGs or other runs of the same DAG is not preferable.

I imagined something like this:

                                                                    .——>   [compute_1] ——.
                                                                  / ——>   [compute_2] ——  \
                                                                /                       .                        \
[create_cluster] —> [create_pool_x6]                         .                        [ delete_pool ] —> [delete cluster]
                                                               \                        .                        /
                                                                 \ ——>   [compute_19] —— /
                                                                   . ——>  [compute_20] ——.
Thanks,
David

> On Sep 21, 2018, at 7:23 PM, Chris Palmer <chris@xxxxxxxxxxxx> wrote:
> 
> What would cause multiple computation tasks to run on the cluster at the
> same time? Are you worried about concurrent DagRuns? Does setting dag
> concurrency and/or task concurrency appropriately solve your problem?
> 
> Chris
> 
> On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <dszakallas@xxxxxxxxxxxxxx>
> wrote:
> 
>> What I am doing is very similar. However I am including the DagRun's id in
>> the pool name to make it unique, as I need to make sure every run gets its
>> own pool. I am getting that from the context object, which is only
>> available within execute methods or templates. How do you make sure each
>> run has it's own pool?
>> 
>> 
>> Thanks,
>> 
>> Dávid Szakállas
>> Software Engineer | Whitepages Data Services
>> 
>> ________________________________
>> From: Taylor Edmiston <tedmiston@xxxxxxxxx>
>> Sent: Thursday, September 20, 2018 6:17:05 PM
>> To: dev@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
>> Subject: Re: Creating dynamic pool from task
>> 
>> I've done something similar.  I have a task at the front of the DAG that
>> ensures the connection pool exists and creates the pool if it doesn't.
>> I've pasted my code below.  This runs in a for loop that creates one DAG
>> per iteration each with its own pool.  Then I pass the pool name into the
>> sensors.
>> 
>> Does this work for your use case?
>> 
>> --
>> 
>> redshift_pool = PythonOperator(
>>    task_id='redshift_pool',
>>    dag=dag,
>>    python_callable=ensure_redshift_pool,
>>    op_kwargs={
>>        'name': workflow.pool,
>>        'slots': REDSHIFT_POOL_SLOTS,
>>    },
>>    ...
>> )
>> 
>> @provide_session
>> def ensure_redshift_pool(name, slots, session=None):
>>    pool = Pool(pool=name, slots=slots)
>>    pool_query = (
>>        session.query(Pool)
>>        .filter(Pool.pool == name)
>>    )
>>    pool_query_result = pool_query.one_or_none()
>>    if not pool_query_result:
>>        logger.info(f'redshift pool "{name}" does not exist - creating
>> it')
>>        session.add(pool)
>>        session.commit()
>>        logger.info(f'created redshift pool "{name}"')
>>    else:
>>        logger.info(f'redshift pool "{name}" already exists')
>> 
>> --
>> 
>> *Taylor Edmiston*
>> Blog <https://blog.tedmiston.com/> | LinkedIn
>> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
>> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
>> <https://stackoverflow.com/story/taylor>
>> 
>> 
>> 
>> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
>> dszakallas@xxxxxxxxxxxxxx>
>> wrote:
>> 
>>> Hi all,
>>> 
>>> I have a DAG that creates a cluster, starts computation tasks, and after
>>> they completed, tears down the cluster. I want to limit concurrency for
>> the
>>> computation tasks carried on this cluster to fixed number. So logically,
>> I
>>> need a pool that is exclusive to the cluster created by a task. I don't
>>> want interference with other DAGs or different runs of the same DAG.
>>> 
>>> I thought I could solve this problem by creating a pool dynamically from
>> a
>>> task after the cluster is created and delete it once the computation
>> tasks
>>> are finished. I thought I could template the pool parameter of the
>>> computation tasks to make them use this dynamically created cluster.
>>> 
>>> However this way the computation tasks will never be triggered. So I
>> think
>>> the pool parameter is saved in the task instance before being templated.
>> I
>>> would like to hear your thoughts on how to achieve the desired behavior.
>>> 
>>> Thanks,
>>> 
>>> Dávid Szakállas
>>> Software Engineer | Whitepages Data Services
>>> 
>>> 
>>> 
>>> 
>>> 
>>