git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GitHub] drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle


drcrallen commented on a change in pull request #5492: Native parallel batch indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201763889
 
 

 ##########
 File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##########
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+    public NoTaskLocationException(String message)
+    {
+      super(message);
+    }
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+    public TaskNotRunnableException(String message)
+    {
+      super(message);
+    }
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+      HttpClient httpClient,
+      ObjectMapper objectMapper,
+      TaskInfoProvider taskInfoProvider,
+      Duration httpTimeout,
+      String callerId,
+      int numThreads,
+      long numRetries
+  )
+  {
+    this.httpClient = httpClient;
+    this.objectMapper = objectMapper;
+    this.taskInfoProvider = taskInfoProvider;
+    this.httpTimeout = httpTimeout;
+    this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+    this.executorService = MoreExecutors.listeningDecorator(
+        Execs.multiThreaded(
+            numThreads,
+            StringUtils.format(
+                "IndexTaskClient-%s-%%d",
+                callerId
+            )
+        )
+    );
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long numRetries)
+  {
+    // Retries [numRetries] times before giving up; this should be set long enough to handle any temporary
+    // unresponsiveness such as network issues, if a task is still in the process of starting up, or if the task is in
+    // the middle of persisting to disk and doesn't respond immediately.
+    return new RetryPolicyFactory(
+        new RetryPolicyConfig()
+            .setMinWait(Period.seconds(MIN_RETRY_WAIT_SECONDS))
+            .setMaxWait(Period.seconds(MAX_RETRY_WAIT_SECONDS))
+            .setMaxRetryCount(numRetries)
+    );
+  }
+
+  protected HttpClient getHttpClient()
+  {
+    return httpClient;
+  }
+
+  protected RetryPolicy newRetryPolicy()
+  {
+    return retryPolicyFactory.makeRetryPolicy();
+  }
+
+  protected <T> T deserialize(String content, TypeReference<T> typeReference) throws IOException
+  {
+    return objectMapper.readValue(content, typeReference);
+  }
+
+  protected <T> T deserialize(String content, Class<T> typeReference) throws IOException
+  {
+    return objectMapper.readValue(content, typeReference);
+  }
+
+  protected byte[] serialize(Object value) throws JsonProcessingException
+  {
+    return objectMapper.writeValueAsBytes(value);
+  }
+
+  protected <T> ListenableFuture<T> doAsync(Callable<T> callable)
 
 Review comment:
   Does it need to be a ListenableFuture or is CompletableFuture sufficient?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@xxxxxxxxxxxxxxxx


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@xxxxxxxxxxxxxxxx
For additional commands, e-mail: dev-help@xxxxxxxxxxxxxxxx