git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Kafka Indexing Service - Decoupling segments from consumer tasks


Hey Gian,

Thanks for the feedback and taking the time to read through the proposal.

Yeah your suggestion is much less complex and I think makes more sense!

Cheers,
Dylan

On 8 May 2018 at 22:06, Gian Merlino <gian@xxxxxxxxxx> wrote:

> Hi Dylan,
>
> My feeling is that it is going to be challenging to add a layer of
> indirection to the Kafka indexing stuff while maintaining its
> exactly-onceness. The exactly-onceness is based around tracking the
> specific Kafka offsets that are read by each task, and is tightly linked to
> which partitions are assigned to which task. I think what you describe
> would be doable, but it would add a lot of complexity to a code base that
> is already pretty complex. If I were in your position I'd try
> repartitioning the original Kafka topic into a second Kafka topic using a
> good key. It doubles the traffic at Kafka but at least it's simple! It
> follows a general rule of making Kafka do as much of the work as possible.
>
> What do you think?
>
> On Thu, May 3, 2018 at 1:53 PM, Dylan Wylie <dylanwylie@xxxxxxxxx> wrote:
>
> > Hey Gian,
> >
> > Thanks for your response!
> >
> > Automatic compaction looks great and it'll definitely help out with
> general
> > efficiency. I'm also excited for the second PR, we've some use cases
> it'll
> > be helpful for.
> >
> > Even with those I still think there may be value in something along of
> the
> > lines of my suggestion.
> >
> > To present a real life example, we have a Kafka topic which emits ~200k
> > messages/second, each of these messages has around 300 fields (in a
> nested
> > avro structure) and its partitioned on some key which is not ingested
> into
> > Druid. We produce a datasource from that message with around 40-50
> fields.
> >
> > Using the Kafka Indexing Service, we measured that a single indexing task
> > extracting the fields for that datasource can consume 7k messages/second.
> > Meaning in order to keep up we have to run around 30 indexing tasks. This
> > results in around 30 segments at around 300mb each for each hour of data
> > compared to 6 segments at around 650mb each when batch ingested, so the
> > cluster's holding up to 3x as much data as it might do if the data was
> > ingested into a smaller number of segments.
> > (I realise that Tranquility & Batch Ingestion partitions data by a hash
> of
> > all the fields, which the KIS can't so will always have less optimal
> > segments unless the Kafka topic is partitioned using a good key)
> >
> > Profiling the indexing peon shows that the largest chunk of time is being
> > spent deserialising the large avro message. So we think that if we could
> > split the current ingestion process into the two job-types as described
> we
> > could scale one up to handle consuming and parsing the message and
> another
> > could manage appending the rows to a (smaller) set of segments.
> >
> > From some initial playing around we've noticed that
> > https://github.com/druid-io/druid/pull/5261 introduced a stand-alone
> > realtime task using the newer appenderator API that could potentially be
> > built on top of.
> >
> > It might be simpler for us to introduce a stream processor which parses
> and
> > extracts the parts of the messages that are needed and emit only those
> > fields to be used in a topic partitioned on something that aids roll-up.
> > However separating parsing data from indexing it feels like it might be
> > more generally useful and avoid the extra work in maintaining a stream
> > processor. (And it seems like a fun way to get hacking with the Druid
> > codebase!).
> >
> > Sorry for the long email, thoughts or comments appreciated!
> >
> > Best regards,
> > Dylan
> >
> >
> >
> > On 3 May 2018 at 02:32, Gian Merlino <gian@xxxxxxxxxx> wrote:
> >
> > > Hey Dylan,
> > >
> > > Great to hear that your experience has generally been positive!
> > >
> > > What do you think about using compaction for this? (The feature added
> in
> > > https://github.com/druid-io/druid/pull/5102.) The idea with compaction
> > was
> > > that it would enable a background process that goes through freshly
> > > inserted segments and re-partitions them optimally.
> > >
> > > For creating multiple datasources out of one topic, there is a PR
> wending
> > > its way through review right now that is relevant: https://github.com/
> > > druid-io/druid/pull/5556.
> > >
> > > On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie <dylanwylie@xxxxxxxxx>
> > wrote:
> > >
> > > > Hey there,
> > > >
> > > > With the recent improvements to the Kafka Indexing Service we've been
> > > > migrating over from Tranquility and have had a very positive
> > experience.
> > > >
> > > > However one of the downsides to using the KIS, is that the number of
> > > > segments generated for each period can't be smaller than the number
> of
> > > > tasks required to consume the queue. So if you have a use case
> > involving
> > > > ingesting from a topic with a high rate of large messages but your
> spec
> > > > only extracts a small proportion of fields you may be forced to run a
> > > large
> > > > number of tasks that generate very small segments.
> > > >
> > > > This email is to check in for peoples thoughts on separating
> consuming
> > > and
> > > > parsing messages from indexing and segment management, in a similar
> > > fashion
> > > > to how Tranquility operates.
> > > >
> > > > Potentially - we could have the supervisor spawn two types of task
> that
> > > can
> > > > be configured independently, a consumer and an appender. The consumer
> > > would
> > > > parse the message based on the spec and then pass the results to the
> > > > appropriate appender task which builds the segment. Another advantage
> > to
> > > > this approach is that it would allow creating multiple datasources
> > from a
> > > > single consumer group rather than ingesting the same topic multiple
> > > times.
> > > >
> > > > I'm quite new to the codebase so all thoughts and comments are
> welcome!
> > > >
> > > > Best regards,
> > > > Dylan
> > > >
> > >
> >
>


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-druid-developers/msg00161.html on line 226
Call Stack
#TimeMemoryFunctionLocation
10.0007372792{main}( ).../msg00161.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-druid-developers/msg00161.html on line 226
Call Stack
#TimeMemoryFunctionLocation
10.0007372792{main}( ).../msg00161.html:0