博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Elasticsearch 6.1.0线程池介绍
阅读量:6458 次
发布时间:2019-06-23

本文共 13027 字,大约阅读时间需要 43 分钟。

开篇

 这篇文章主要是用来讲解ES线程池(EsExecutors)的实现,然后象征性的和JDK的Executors实现进行了简单的对比,看了这篇文章以后要对Executors和ThreadPoolExecutor的使用更有信心才对。

elasticsearch线程池配置

public class ThreadPool extends AbstractComponent implements Scheduler, Closeable {  final int availableProcessors = EsExecutors.numberOfProcessors(settings);  final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);  final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);  final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);  builders.put(Names.GENERIC,   new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));  builders.put(Names.INDEX,   new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));  builders.put(Names.BULK,   new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200));   builders.put(Names.GET,   new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));  builders.put(Names.SEARCH,   new AutoQueueAdjustingExecutorBuilder(settings,                        Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));  builders.put(Names.MANAGEMENT,   new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));  builders.put(Names.LISTENER,   new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));  builders.put(Names.FLUSH,   new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));  builders.put(Names.REFRESH,   new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));  builders.put(Names.WARMER,   new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));  builders.put(Names.SNAPSHOT,   new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));  builders.put(Names.FETCH_SHARD_STARTED,   new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));  builders.put(Names.FORCE_MERGE,   new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));  builders.put(Names.FETCH_SHARD_STORE,   new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));}

说明:

  • elasticsearch线程池根据作用的不同主要分为两大类 ScalingExecutor和FixedExecutor。
  • ScalingExecutor表示线程池中的线程数是动态可变的。
  • FixedExecutor表示线程池中的线程池是不可变的。

elasticsearch线程池分类

elasticsearch线程池的线程按照源码的实现来看分为FIXED和SCALING两大类FIXED的意思是固定线程的数量(core thread个数 = max thread个数),SCALING的意思是动态调整线程数量(core thread个数 != max thread个数)。

FIXED

说明:大小固定设置的threadpool,它有一个queue来存放pending的请求,其中pool的大小默认是core*5,queue_size默认是-1(即是无限制)。

  • LISTENER:用作client的操作,默认大小halfProcMaxAt10,queue_size=-1无限制;
  • GET:用作get操作,默认大小availableProcessors,queue_size为1000;
  • INDEX:用作index或delete操作,默认大小availableProcessors,queue_size为200;
  • BULK:用作bulk操作,默认大小为availableProcessors,queue_size为200;
  • SEARCH:用作count或是search操作,默认大小((availableProcessors * 3) / 2) + 1;queue_size为1000;
  • SUGGEST:用作suggest操作,默认大小availableProcessors,queue_size为1000;
  • PERCOLATE:用作percolate,默认大小为availableProcessors,queue_size为1000;
  • FORCE_MERGE:用作force_merge操作(2.1之前叫做optimize),默认大小为1;

SCALING

说明:拥有可变大小的pool,其值可在1和设置值之间。

  • GENERIC:通用的操作,比如node的discovery,默认大小genericThreadPoolMax,默认keep alive时间是30sec;
  • MANAGEMENT:用作ES的管理,比如集群的管理;默认大小5,keep alive时间为5min;
  • FLUSH:用作flush操作,默认大小为halfProcMaxAt5,keep alive时间为5min;
  • REFRESH:用作refresh操作,默认大小为halfProcMaxAt10,keep alive时间为5min;
  • WARMER:用作index warm-up操作,默认大小为halfProcMaxAt5,keep alive时间为5min;
  • SNAPSHOT:用作snapshot操作,默认大小为halfProcMaxAt5,keep alive时间为5min;
  • FETCH_SHARD_STARTED:用作fetch shard开始操作,默认大小availableProcessors * 2,keep alive时间为5min;
  • FETCH_SHARD_STORE:用作fetch shard存储操作,默认大小availableProcessors * 2,keep alive时间为5min;

JDK的Executors

public class Executors {    public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue
()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue
(), threadFactory); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue
()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue
(), threadFactory); }}

说明:

  • Executors的newFixedThreadPool创建固定线程数量的线程池。
  • Executors的newCachedThreadPool创建可动态调整线程数量的线程池。
  • Executors创建的是ThreadPoolExecutor对象。

Elasticsearch的EsExecutors

public class EsExecutors {    public static final Setting
PROCESSORS_SETTING = Setting.intSetting("processors", Runtime.getRuntime().availableProcessors(), 1, Property.NodeScope); public static int numberOfProcessors(final Settings settings) { return PROCESSORS_SETTING.get(settings); } public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { ExecutorScalingQueue
queue = new ExecutorScalingQueue<>(); EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder); queue.executor = executor; return executor; } public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) { BlockingQueue
queue; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections.
newBlockingQueue(), queueCapacity); } return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder); } public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize, int maxQueueSize, int frameSize, TimeValue targetedResponseTime, ThreadFactory threadFactory, ThreadContext contextHolder) { if (initialQueueCapacity <= 0) { throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " + initialQueueCapacity); } ResizableBlockingQueue
queue = new ResizableBlockingQueue<>(ConcurrentCollections.
newBlockingQueue(), initialQueueCapacity); return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory, new EsAbortPolicy(), contextHolder); }}

说明:

  • newScaling()方法创建可扩展线程数量的线程池,淘汰策略使用ForceQueuePolicy。
  • newFixed()方法创建创建固定线程数量的线程池,淘汰策略使用EsAbortPolicy。
  • newAutoQueueFixed()方法创建固定线程数量但是Queue队列数量可以动态调整的线程池,淘汰策略使用EsAbortPolicy。。
  • EsExecutors内部创建的是EsThreadPoolExecutor对象。
  • EsExecutors的实现借鉴了JDK的Executors接口,给我们提供了自定Executors的思路。

EsExecutors的EsThreadPoolExecutor

public class EsThreadPoolExecutor extends ThreadPoolExecutor {    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,            BlockingQueue
workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) { this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder); } EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.name = name; this.contextHolder = contextHolder; } @Override public void execute(final Runnable command) { doExecute(wrapRunnable(command)); } protected void doExecute(final Runnable command) { try { super.execute(command); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } } else { throw ex; } } } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); assert assertDefaultContext(r); }}

说明:

  • EsThreadPoolExecutor继承自ThreadPoolExecutor对象,构造函数内部初始化ThreadPoolExecutor对象。
  • EsThreadPoolExecutor的核心的execute方法内部也是调用了ThreadPoolExecutor的execute方法。
  • EsThreadPoolExecutor重新了execute和afterExecute方法。
  • EsThreadPoolExecutor给我们提供了重写ThreadPoolExecutor的思路,值得学习。

EsExecutors的ThreadFactory

public class EsExecutors {    public static String threadName(Settings settings, String ... names) {        String namePrefix =                Arrays                        .stream(names)                        .filter(name -> name != null)                        .collect(Collectors.joining(".", "[", "]"));        return threadName(settings, namePrefix);    }    public static ThreadFactory daemonThreadFactory(String namePrefix) {        return new EsThreadFactory(namePrefix);    }    static class EsThreadFactory implements ThreadFactory {        final ThreadGroup group;        final AtomicInteger threadNumber = new AtomicInteger(1);        final String namePrefix;        EsThreadFactory(String namePrefix) {            this.namePrefix = namePrefix;            SecurityManager s = System.getSecurityManager();            group = (s != null) ? s.getThreadGroup() :                    Thread.currentThread().getThreadGroup();        }        @Override        public Thread newThread(Runnable r) {            Thread t = new Thread(group, r,                    namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",                    0);            t.setDaemon(true);            return t;        }    }    private EsExecutors() {    }}

说明:

  • 1.EsExecutors给我们提供一种创建线程工厂的标准方法,实现ThreadFactory接口重新newThread()方法。
  • 2.通过AtomicInteger threadNumber = new AtomicInteger(1)变量生成线程自增的线程id。
  • 3.线程池的线程都有具体意义的线程名非常重要有利于排查问题,非常推荐使用。

EsExecutors的AbortPolicy

public interface XRejectedExecutionHandler extends RejectedExecutionHandler {    long rejected();}public class EsAbortPolicy implements XRejectedExecutionHandler {    private final CounterMetric rejected = new CounterMetric();    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        if (r instanceof AbstractRunnable) {            if (((AbstractRunnable) r).isForceExecution()) {                BlockingQueue
queue = executor.getQueue(); if (!(queue instanceof SizeBlockingQueue)) { throw new IllegalStateException("forced execution, but expected a size queue"); } try { ((SizeBlockingQueue) queue).forcePut(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException("forced execution, but got interrupted", e); } return; } } rejected.inc(); throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown()); } public long rejected() { return rejected.count(); }}static class ForceQueuePolicy implements XRejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { throw new EsRejectedExecutionException(e); } } @Override public long rejected() { return 0; } }

说明:

  • EsAbortPolicy的过期策略提供了我们自定实现过期策略的案例。

参考文章

转载地址:http://blizo.baihongyu.com/

你可能感兴趣的文章
过滤DataTable表中的重复数据
查看>>
prepare for travel 旅行准备
查看>>
再次更新
查看>>
C# 获取编码
查看>>
mysql的数据类型int、bigint、smallint 和 tinyint取值范围
查看>>
利用网易获取所有股票数据
查看>>
移动铁通宽带上网设置教程
查看>>
Python算法(含源代码下载)
查看>>
利用Windows自带的Certutil查看文件MD5
查看>>
通过原生js添加div和css
查看>>
简单的导出表格和将表格下载到桌面上。
查看>>
《ArcGIS Engine+C#实例开发教程》第一讲桌面GIS应用程序框架的建立
查看>>
JAVA - 大数类详解
查看>>
查询指定名称的文件
查看>>
Python 嵌套列表解析
查看>>
[GXOI/GZOI2019]旧词——树链剖分+线段树
查看>>
anroid 广播
查看>>
AJAX POST&跨域 解决方案 - CORS
查看>>
关于最小生成树中的kruskal算法中判断两个点是否在同一个连通分量的方法总结...
查看>>
开篇,博客的申请理由
查看>>