git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Kafka Indexing Service - Decoupling segments from consumer tasks


Hey Gian,

Thanks for your response!

Automatic compaction looks great and it'll definitely help out with general
efficiency. I'm also excited for the second PR, we've some use cases it'll
be helpful for.

Even with those I still think there may be value in something along of the
lines of my suggestion.

To present a real life example, we have a Kafka topic which emits ~200k
messages/second, each of these messages has around 300 fields (in a nested
avro structure) and its partitioned on some key which is not ingested into
Druid. We produce a datasource from that message with around 40-50 fields.

Using the Kafka Indexing Service, we measured that a single indexing task
extracting the fields for that datasource can consume 7k messages/second.
Meaning in order to keep up we have to run around 30 indexing tasks. This
results in around 30 segments at around 300mb each for each hour of data
compared to 6 segments at around 650mb each when batch ingested, so the
cluster's holding up to 3x as much data as it might do if the data was
ingested into a smaller number of segments.
(I realise that Tranquility & Batch Ingestion partitions data by a hash of
all the fields, which the KIS can't so will always have less optimal
segments unless the Kafka topic is partitioned using a good key)

Profiling the indexing peon shows that the largest chunk of time is being
spent deserialising the large avro message. So we think that if we could
split the current ingestion process into the two job-types as described we
could scale one up to handle consuming and parsing the message and another
could manage appending the rows to a (smaller) set of segments.

>From some initial playing around we've noticed that
https://github.com/druid-io/druid/pull/5261 introduced a stand-alone
realtime task using the newer appenderator API that could potentially be
built on top of.

It might be simpler for us to introduce a stream processor which parses and
extracts the parts of the messages that are needed and emit only those
fields to be used in a topic partitioned on something that aids roll-up.
However separating parsing data from indexing it feels like it might be
more generally useful and avoid the extra work in maintaining a stream
processor. (And it seems like a fun way to get hacking with the Druid
codebase!).

Sorry for the long email, thoughts or comments appreciated!

Best regards,
Dylan



On 3 May 2018 at 02:32, Gian Merlino <gian@xxxxxxxxxx> wrote:

> Hey Dylan,
>
> Great to hear that your experience has generally been positive!
>
> What do you think about using compaction for this? (The feature added in
> https://github.com/druid-io/druid/pull/5102.) The idea with compaction was
> that it would enable a background process that goes through freshly
> inserted segments and re-partitions them optimally.
>
> For creating multiple datasources out of one topic, there is a PR wending
> its way through review right now that is relevant: https://github.com/
> druid-io/druid/pull/5556.
>
> On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie <dylanwylie@xxxxxxxxx> wrote:
>
> > Hey there,
> >
> > With the recent improvements to the Kafka Indexing Service we've been
> > migrating over from Tranquility and have had a very positive experience.
> >
> > However one of the downsides to using the KIS, is that the number of
> > segments generated for each period can't be smaller than the number of
> > tasks required to consume the queue. So if you have a use case involving
> > ingesting from a topic with a high rate of large messages but your spec
> > only extracts a small proportion of fields you may be forced to run a
> large
> > number of tasks that generate very small segments.
> >
> > This email is to check in for peoples thoughts on separating consuming
> and
> > parsing messages from indexing and segment management, in a similar
> fashion
> > to how Tranquility operates.
> >
> > Potentially - we could have the supervisor spawn two types of task that
> can
> > be configured independently, a consumer and an appender. The consumer
> would
> > parse the message based on the spec and then pass the results to the
> > appropriate appender task which builds the segment. Another advantage to
> > this approach is that it would allow creating multiple datasources from a
> > single consumer group rather than ingesting the same topic multiple
> times.
> >
> > I'm quite new to the codebase so all thoughts and comments are welcome!
> >
> > Best regards,
> > Dylan
> >
>


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-druid-developers/msg00147.html on line 173
Call Stack
#TimeMemoryFunctionLocation
10.0008368696{main}( ).../msg00147.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-druid-developers/msg00147.html on line 173
Call Stack
#TimeMemoryFunctionLocation
10.0008368696{main}( ).../msg00147.html:0