Re: Kafka Indexing Service - Decoupling segments from consumer tasks
My feeling is that it is going to be challenging to add a layer of
indirection to the Kafka indexing stuff while maintaining its
exactly-onceness. The exactly-onceness is based around tracking the
specific Kafka offsets that are read by each task, and is tightly linked to
which partitions are assigned to which task. I think what you describe
would be doable, but it would add a lot of complexity to a code base that
is already pretty complex. If I were in your position I'd try
repartitioning the original Kafka topic into a second Kafka topic using a
good key. It doubles the traffic at Kafka but at least it's simple! It
follows a general rule of making Kafka do as much of the work as possible.
What do you think?
On Thu, May 3, 2018 at 1:53 PM, Dylan Wylie <dylanwylie@xxxxxxxxx> wrote:
> Hey Gian,
> Thanks for your response!
> Automatic compaction looks great and it'll definitely help out with general
> efficiency. I'm also excited for the second PR, we've some use cases it'll
> be helpful for.
> Even with those I still think there may be value in something along of the
> lines of my suggestion.
> To present a real life example, we have a Kafka topic which emits ~200k
> messages/second, each of these messages has around 300 fields (in a nested
> avro structure) and its partitioned on some key which is not ingested into
> Druid. We produce a datasource from that message with around 40-50 fields.
> Using the Kafka Indexing Service, we measured that a single indexing task
> extracting the fields for that datasource can consume 7k messages/second.
> Meaning in order to keep up we have to run around 30 indexing tasks. This
> results in around 30 segments at around 300mb each for each hour of data
> compared to 6 segments at around 650mb each when batch ingested, so the
> cluster's holding up to 3x as much data as it might do if the data was
> ingested into a smaller number of segments.
> (I realise that Tranquility & Batch Ingestion partitions data by a hash of
> all the fields, which the KIS can't so will always have less optimal
> segments unless the Kafka topic is partitioned using a good key)
> Profiling the indexing peon shows that the largest chunk of time is being
> spent deserialising the large avro message. So we think that if we could
> split the current ingestion process into the two job-types as described we
> could scale one up to handle consuming and parsing the message and another
> could manage appending the rows to a (smaller) set of segments.
> From some initial playing around we've noticed that
> https://github.com/druid-io/druid/pull/5261 introduced a stand-alone
> realtime task using the newer appenderator API that could potentially be
> built on top of.
> It might be simpler for us to introduce a stream processor which parses and
> extracts the parts of the messages that are needed and emit only those
> fields to be used in a topic partitioned on something that aids roll-up.
> However separating parsing data from indexing it feels like it might be
> more generally useful and avoid the extra work in maintaining a stream
> processor. (And it seems like a fun way to get hacking with the Druid
> Sorry for the long email, thoughts or comments appreciated!
> Best regards,
> On 3 May 2018 at 02:32, Gian Merlino <gian@xxxxxxxxxx> wrote:
> > Hey Dylan,
> > Great to hear that your experience has generally been positive!
> > What do you think about using compaction for this? (The feature added in
> > https://github.com/druid-io/druid/pull/5102.) The idea with compaction
> > that it would enable a background process that goes through freshly
> > inserted segments and re-partitions them optimally.
> > For creating multiple datasources out of one topic, there is a PR wending
> > its way through review right now that is relevant: https://github.com/
> > druid-io/druid/pull/5556.
> > On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie <dylanwylie@xxxxxxxxx>
> > > Hey there,
> > >
> > > With the recent improvements to the Kafka Indexing Service we've been
> > > migrating over from Tranquility and have had a very positive
> > >
> > > However one of the downsides to using the KIS, is that the number of
> > > segments generated for each period can't be smaller than the number of
> > > tasks required to consume the queue. So if you have a use case
> > > ingesting from a topic with a high rate of large messages but your spec
> > > only extracts a small proportion of fields you may be forced to run a
> > large
> > > number of tasks that generate very small segments.
> > >
> > > This email is to check in for peoples thoughts on separating consuming
> > and
> > > parsing messages from indexing and segment management, in a similar
> > fashion
> > > to how Tranquility operates.
> > >
> > > Potentially - we could have the supervisor spawn two types of task that
> > can
> > > be configured independently, a consumer and an appender. The consumer
> > would
> > > parse the message based on the spec and then pass the results to the
> > > appropriate appender task which builds the segment. Another advantage
> > > this approach is that it would allow creating multiple datasources
> from a
> > > single consumer group rather than ingesting the same topic multiple
> > times.
> > >
> > > I'm quite new to the codebase so all thoughts and comments are welcome!
> > >
> > > Best regards,
> > > Dylan
> > >