git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: A Naive Multi-Scheduler Architecture Experiment of Airflow


Thanks Devjyoti for your reply.

To elaborate based on your inputs: 

- *When to add one more shard*:
We have designed some metrics, like "how long the scheduler instance takes to parse & schedule all DAGs (in the subdir it’s taking care of)". When the metric is higher than a given threshold for long enough time, we may want to add one more shard. 

- *Easy Solution to Balance Shard Load*:
Exactly the same as you’re pointing out, we create initial set of shards by randomly distribute our DAGs into each subdir. Similar to building a mathematical model, there are some assumptions we have to make for convenience, like “complexity of DAGs are roughly equal”.
As for new DAGs: we developed an application creating DAGs based on metadata, and the application would check the # of files in each subdir and always put the new DAG into the subdir with the least # of DAGs.


XD

> On 2 Nov 2018, at 12:47 AM, Devjyoti Patra <devjyotip@xxxxxxxxxx> wrote:
> 
>>> 1. “Shard by # of files may not yield same load”: fully agree with you.
> This concern was also raised by other co-workers in my team. But given this
> is a preliminary trial, we didn’t consider this yet.
> 
> One issue here is that when do you decide to add one more shard? I think if
> you monitor the time it takes to parse each source file and log it; you can
> use this to find the outliers when your scheduling SLA is breached and move
> the outliers to a new shard. Creating the initial set of shard by randomly
> putting an equal number of files in each subdir seems like the easiest way
> to approach this problem.
> 
> On Thu, Nov 1, 2018 at 7:11 PM Deng Xiaodong <xd.deng.r@xxxxxxxxx> wrote:
> 
>> Thanks Kelvin and Max for your inputs!
>> 
>> To Kelvin’s questions:
>> 1. “Shard by # of files may not yield same load”: fully agree with you.
>> This concern was also raised by other co-workers in my team. But given this
>> is a preliminary trial, we didn’t consider this yet.
>> 2. We haven’t started to look into how we can dynamically allocate
>> scheduler resource yet. But I think this preliminary trial would be a good
>> starting point.
>> 3. DB: look forward to your PR on this!
>> 4. “Why do you need to shard the scheduler while the scheduler can scale
>> up pretty high”
>> There are a few reasons:
>> 4.1 we have strict SLA on scheduling. We expect one scheduling loop takes
>> < 3 minutes no matter how many DAGs we have
>> 4.2 we’re containerising the deployment, while our infrastructure team
>> added the restriction that for each pod we can only use up to 2 cores
>> (blocked us from scaling vertically).
>> 4.3 even though this naive architecture doesn’t provide HA, actually it
>> partially addresses the availability concern (if one scheduler out of 5
>> fails, at least 80% DAGs can still be scheduled properly).
>> 
>> To Max’s questions:
>> 1. I haven’t tested pools or queues features with this architecture. So
>> can’t give a very firm answer on this.
>> 2. In the load tests I have done, I haven’t observed such “misfires” yet
>> (I’m running a customised version based on 1.10.0 BTW)
>> 3. This is a very valid point. I haven’t checked the implementation of DAG
>> prioritisation in detail yet. For the scenario in our team, we don’t
>> prioritise DAGs, so we didn’t take this into consideration. On the other
>> hand, this naive architecture didn’t change anything in Airflow. It simply
>> makes use of the “--subdir” argument of scheduler command. If we want to
>> have a more serious multi-scheduler setting-up natively supported by
>> Airflow, I believe for sure we need to make significant changes to the code
>> to ensure all features, like cross DAG prioritisation, are supported.
>> 
>> 
>> Kindly let me know your thoughts. Thanks!
>> 
>> XD
>> 
>> 
>>> On 1 Nov 2018, at 4:25 AM, Maxime Beauchemin <maximebeauchemin@xxxxxxxxx>
>> wrote:
>>> 
>>> A few related thoughts:
>>> * there may be hiccups around concurrency (pools, queues), though the
>> worker should double-checks that the constraints are still met when firing
>> the task, so in theory this should be ok
>>> * there may be more "misfires" meaning the task gets sent to the worker,
>> but by the time it starts the conditions aren't met anymore because of a
>> race condition with one of the other schedulers. Here I'm assuming recent
>> versions of Airflow will simply eventually re-fire the misfires and heal
>>> * cross DAG prioritization can't really take place anymore as there's
>> not a shared "ready-to-run" list of task instances that can be sorted by
>> priority_weight. Whichever scheduler instance fires first is likely to get
>> the open slots first.
>>> 
>>> Max
>>> 
>>> 
>>> On Wed, Oct 31, 2018 at 1:00 PM Kevin Yang <yrqls21@xxxxxxxxx <mailto:
>> yrqls21@xxxxxxxxx>> wrote:
>>> Finally we start to talk about this seriously? Yeah! :D
>>> 
>>> For your approach, a few thoughts:
>>> 
>>>   1. Shard by # of files may not yield same load--even very different
>> load
>>>   since we may have some framework DAG file producing 500 DAG and take
>>>   forever to parse.
>>>   2. I think Alex Guziel <https://github.com/saguziel <
>> https://github.com/saguziel>> had previously
>>>   talked about using apache helix to shard the scheduler. I haven't
>> look a
>>>   lot into it but may be something you're interested in. I personally
>> like
>>>   that idea because we don't need to reinvent the wheel about a lot
>> stuff(
>>>   less code to maintain also ;) ).
>>>   3. About the DB part, I should be contributing back some changes that
>>>   can dramatically drop the DB CPU usage. Afterwards I think we should
>> have
>>>   plenty of headroom( assuming the traffic is ~4000 DAG files and ~40k
>>>   concurrency running task instances) so we should probly be fine here.
>>> 
>>> Also I'm kinda curious about your setup and want to understand why do you
>>> need to shard the scheduler, since the scheduler can now scale up pretty
>>> high actually.
>>> 
>>> Thank you for initiate the discussion, I think it can turn out to be a
>> very
>>> valuable and critical discussion--many people have been
>> thinking/discussing
>>> about this and I can't wait to hear the ideas :D
>>> 
>>> Cheers,
>>> Kevin Y
>>> 
>>> On Wed, Oct 31, 2018 at 7:38 AM Deng Xiaodong <xd.deng.r@xxxxxxxxx
>> <mailto:xd.deng.r@xxxxxxxxx>> wrote:
>>> 
>>>> Hi Folks,
>>>> 
>>>> Previously I initiated a discussion about the best practice of Airflow
>>>> setting-up, and it was agreed by a few folks that scheduler may become
>> one
>>>> of the bottleneck component (we can only run one scheduler instance,
>> can
>>>> only scale vertically rather than horizontally, etc.). Especially when
>> we
>>>> have thousands of DAGs, the scheduling latency may be high.
>>>> 
>>>> In our team, we have experimented a naive multiple-scheduler
>> architecture.
>>>> Would like to share here, and also seek inputs from you.
>>>> 
>>>> **1. Background**
>>>> - Inside DAG_Folder, we can have sub-folders.
>>>> - When we initiate scheduler instance, we can specify “--subdir” for
>> it,
>>>> which will specify the specific directory that the scheduler is going
>> to
>>>> “scan” (https://airflow.apache.org/cli.html#scheduler <
>> https://airflow.apache.org/cli.html#scheduler>).
>>>> 
>>>> **2. Our Naive Idea**
>>>> Say we have 2,000 DAGs. If we run one single scheduler instance, one
>>>> scheduling loop will traverse all 2K DAGs.
>>>> 
>>>> Our idea is:
>>>> Step-1: Create multiple sub-directories, say five, under DAG_Folder
>>>> (subdir1, subdir2, …, subdir5)
>>>> Step-2: Distribute the DAGs evenly into these sub-directories (400
>> DAGs in
>>>> each)
>>>> Step-3: then we can start scheduler instance on 5 different machines,
>>>> using command `airflow scheduler --subdir subdir<i>` on machine <i>.
>>>> 
>>>> Hence eventually, each scheduler only needs to take care of 400 DAGs.
>>>> 
>>>> **3. Test & Results**
>>>> - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
>>>> - DAGs are stored using network attached storage (the same drive
>> mounted
>>>> to all nodes), so we don’t concern about the DAG_Folder
>> synchronization.
>>>> - No conflict observed (each DAG file will only be parsed & scheduled
>> by
>>>> one scheduler instance).
>>>> - The scheduling speed improves almost linearly. Demonstrated that we
>> can
>>>> scale scheduler horizontally.
>>>> 
>>>> **4. Highlight**
>>>> - This naive idea doesn’t address scheduler availability.
>>>> - As Kelvin Yang shared earlier in another thread, the database may be
>>>> another bottleneck when the load is high. But this is not considered
>> here
>>>> yet.
>>>> 
>>>> 
>>>> Kindly share your thoughts on this naive idea. Thanks.
>>>> 
>>>> 
>>>> 
>>>> Best regards,
>>>> XD
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>> 
>>