本文共 13027 字,大约阅读时间需要 43 分钟。
这篇文章主要是用来讲解ES线程池(EsExecutors)的实现,然后象征性的和JDK的Executors实现进行了简单的对比,看了这篇文章以后要对Executors和ThreadPoolExecutor的使用更有信心才对。
public class ThreadPool extends AbstractComponent implements Scheduler, Closeable { final int availableProcessors = EsExecutors.numberOfProcessors(settings); final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200)); builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1)); builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));}
说明:
elasticsearch线程池的线程按照源码的实现来看分为FIXED和SCALING两大类,FIXED的意思是固定线程的数量(core thread个数 = max thread个数),SCALING的意思是动态调整线程数量(core thread个数 != max thread个数)。
说明:大小固定设置的threadpool,它有一个queue来存放pending的请求,其中pool的大小默认是core*5,queue_size默认是-1(即是无限制)。
说明:拥有可变大小的pool,其值可在1和设置值之间。
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (), threadFactory); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue ()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue (), threadFactory); }}
说明:
public class EsExecutors { public static final SettingPROCESSORS_SETTING = Setting.intSetting("processors", Runtime.getRuntime().availableProcessors(), 1, Property.NodeScope); public static int numberOfProcessors(final Settings settings) { return PROCESSORS_SETTING.get(settings); } public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { ExecutorScalingQueue queue = new ExecutorScalingQueue<>(); EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder); queue.executor = executor; return executor; } public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) { BlockingQueue queue; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections. newBlockingQueue(), queueCapacity); } return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder); } public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize, int maxQueueSize, int frameSize, TimeValue targetedResponseTime, ThreadFactory threadFactory, ThreadContext contextHolder) { if (initialQueueCapacity <= 0) { throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " + initialQueueCapacity); } ResizableBlockingQueue queue = new ResizableBlockingQueue<>(ConcurrentCollections. newBlockingQueue(), initialQueueCapacity); return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory, new EsAbortPolicy(), contextHolder); }}
说明:
public class EsThreadPoolExecutor extends ThreadPoolExecutor { EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, ThreadContext contextHolder) { this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder); } EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.name = name; this.contextHolder = contextHolder; } @Override public void execute(final Runnable command) { doExecute(wrapRunnable(command)); } protected void doExecute(final Runnable command) { try { super.execute(command); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } } else { throw ex; } } } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); assert assertDefaultContext(r); }}
说明:
public class EsExecutors { public static String threadName(Settings settings, String ... names) { String namePrefix = Arrays .stream(names) .filter(name -> name != null) .collect(Collectors.joining(".", "[", "]")); return threadName(settings, namePrefix); } public static ThreadFactory daemonThreadFactory(String namePrefix) { return new EsThreadFactory(namePrefix); } static class EsThreadFactory implements ThreadFactory { final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; EsThreadFactory(String namePrefix) { this.namePrefix = namePrefix; SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0); t.setDaemon(true); return t; } } private EsExecutors() { }}
说明:
public interface XRejectedExecutionHandler extends RejectedExecutionHandler { long rejected();}public class EsAbortPolicy implements XRejectedExecutionHandler { private final CounterMetric rejected = new CounterMetric(); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r instanceof AbstractRunnable) { if (((AbstractRunnable) r).isForceExecution()) { BlockingQueuequeue = executor.getQueue(); if (!(queue instanceof SizeBlockingQueue)) { throw new IllegalStateException("forced execution, but expected a size queue"); } try { ((SizeBlockingQueue) queue).forcePut(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException("forced execution, but got interrupted", e); } return; } } rejected.inc(); throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown()); } public long rejected() { return rejected.count(); }}static class ForceQueuePolicy implements XRejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { throw new EsRejectedExecutionException(e); } } @Override public long rejected() { return 0; } }
说明:
转载地址:http://blizo.baihongyu.com/