clienthold(clientHold下一阶段)

目录

  • 隔离概念
  • 常见的隔离策略
  • 基于Servlet 3实现请求隔离
  • 基于DeferredResult实现Http长轮询监听配置和微信扫码登录
  • 隔离术相关拓展

推荐:阅读下自定义配置解析类:WebAsyncBeanDefinitionRegistryPostProcessor,希望能有所收获。

隔离概念

将系统或资源相互分开,发生故障时,限定传播范围和影响范围,以防拖垮整个系统/服务。

常见的隔离策略

线程隔离:主要指线程池的隔离,主要分为异步请求异步处理异步请求:根据业务进行请求分类,交给不同的线程池处理。当一个业务请求出现问题时,不会影响到其它线程池,从而保证了其他服务可用性。异步处理:对一些不重要的业务流程(消息通知,日志记录),可以用线程池异步处理从主业务流程中剥离出来;对于多策略业务流程处理(微信支付,支付宝支付),可以用线程池隔离各个策略处理,提高可用性。

服务隔离:实际工作中,会有多个需求同时进行,在联调和测试时,会部署多个版本的服务,通过分组版本号来隔离不同需求的服务。

机房隔离:系统为了应对高可用的要求,会进行多机房部署,当一个机房出问题后,通过把流量切到另一个机房,来保证系统的可用性。

读写隔离:通过主从模式将读和写集群隔离,例如:数据库的读写分离,redis主从同步。

热点隔离:对于读热点,通过多级缓存来处理;对于写热点,一般通过缓存+队列模式削峰处理。

基于Servlet 3实现请求隔离

我们将从如下几点了解Servlet3异步化:为什么实现异步化需要Servlet3?请求异步化的好处?哪些容器支持Servlet3?如何使用Servlet3处理请求异步化?

为什么实现异步化需要Servlet3?Servlet3开始支持请求异步化,可以将请求解析和业务处理进行分离。

Tomcat处理Http请求流程分为:

1.接收请求并解析成HttpServletRequest,

2.交给Servlet进行业务处理,

3.通过HttpServletResponse写出响应。

Servlet2请求处理流程:servlet线程负责请求解析,业务处理和响应数据生成整个过程,一直处于阻塞状态。

架构篇-一分钟掌握隔离术

Servlet2请求处理流程

Servlet3异步请求处理流程:tomcat线程只负责请求解析,把业务处理和响应数据生成交给业务线程处理,servlet线程归还到容器,来处理其他请求,提高了web容器的并发能力。

架构篇-一分钟掌握隔离术

Servlet3异步请求处理流程

异步化的好处

  • 请求解析和业务处理分离,提高Tomcat容器并发处理能力。
  • 根据业务重要性进行分级,定义不同业务线程池来处理,业务之间相互隔离,如图所示:
架构篇-一分钟掌握隔离术

业务线程池隔离

支持Servlet3的容器

Tomcat 7,JBOSS 6.0.0 ,Weblogic 12c,WebSphere 8.0 ,Jetty 8.0,Glassfish 3.0,Resin 4.0.4及以上

具体代码:

controller层:

@GetMapping(value = "/async/servlet")
    public void asyncServlet(HttpServletRequest request, HttpServletResponse response){
        System.out.println("外部线程1:" + Thread.currentThread().getName());
        WebAsyncTaskExecutor.submit(request, ()->{
            System.out.println("内部线程:" + Thread.currentThread().getName());
            return "执行成功";
        });
        System.out.println("外部线程2:" + Thread.currentThread().getName());
    }

异步任务执行器:

@Slf4j
public class WebAsyncTaskExecutor {

    private static long DefaultTimeOut = 60 * 1000L;

    private DeferredResult Default = new DeferredResult();

    public static <R> DeferredResult<R> submit(Supplier<R> supplier) {
        return submit(supplier,defaultDeferredResult());
    }

    public static <R> DeferredResult<R> submit(Supplier<R> supplier, DeferredResult<R> deferredResult) {

        ThreadPoolExecutor threadPoolExecutor = WebAsyncPoolManager.get();
        if (threadPoolExecutor == null) {
            R result = supplier.get();
            deferredResult.setResult(result);
            return deferredResult;
        }

        threadPoolExecutor.execute(()->{
            R result = supplier.get();
            deferredResult.setResult(result);
        });
        return deferredResult;
    }

    public static <R> void submit(HttpServletRequest request, Supplier<R> supplier) {
        submit(request,supplier,null);
    }

    public static <R> void submit(HttpServletRequest request, Supplier<R> supplier, AsyncListener asyncListener) {

        AsyncContext asyncContext = request.startAsync();
        asyncContext.setTimeout(DefaultTimeOut);

        if (asyncListener != null) {
            asyncContext.addListener(asyncListener);
        }

        ThreadPoolExecutor threadPoolExecutor = WebAsyncPoolManager.get();
        if (threadPoolExecutor == null) {
            R result = supplier.get();
            wirte(asyncContext,result);
            return;
        }

        threadPoolExecutor.execute(()->{
            R result = supplier.get();
            wirte(asyncContext,result);
        });
    }

    private static DeferredResult defaultDeferredResult(){

        DeferredResult deferredResult = new DeferredResult(DefaultTimeOut);
        deferredResult.onTimeout(new Runnable() {
            @Override
            public void run() {
                deferredResult.setResult(BaseResponse.error(ErrorMessage.TIMEOUT_ERROR));
            }
        });

        return deferredResult;
    }

    private static void wirte(AsyncContext asyncContext, Object result) {

        HttpServletRequest req = (HttpServletRequest) asyncContext.getRequest();
        HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();

        try {
            ResponseUtil.responseJson(req,resp,HttpServletResponse.SC_OK,result);
        } catch (Exception ex) {
            log.error("wirte error!",ex);
            resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
        } finally {
            asyncContext.complete();
        }
    }
}

自定义配置解析类

#yml文件的相关配置
spring:
  webasync:
    pools[0]:
      name: test
      corePoolSize: 5
      maxPoolSize: 5
      keepAliveTime: 30
      queueSize: 1024
    pools[1]:
      name: test1
      corePoolSize: 5
      maxPoolSize: 5
      keepAliveTime: 30
      queueSize: 1024

@Slf4j
public class WebAsyncBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        log.info("=======WebAsyncBeanDefinitionRegistryPostProcessor start ======");
        Environment environment =  this.applicationContext.getEnvironment();
        BindResult<WebAsyncConfig> bindResult = Binder.get(environment).bind(WebAsyncConfig.PREFIX, Bindable.of(WebAsyncConfig.class));
        if (bindResult == null || !bindResult.isBound()) {
            log.info("WebAsyncBeanDefinitionRegistryPostProcessor bindResult is null");
            return;
        }

        WebAsyncConfig webAsyncConfig = bindResult.get();
        log.info("WebAsyncBeanDefinitionRegistryPostProcessor webAsyncConfig : {}",JSON.toJSONString(webAsyncConfig));

        if (webAsyncConfig == null || webAsyncConfig.getPools() == null || webAsyncConfig.getPools().size() <= 0) {
            log.error("WebAsyncBeanDefinitionRegistryPostProcessor webasync config error! current config:{}", JSON.toJSONString(webAsyncConfig));
            return;
        }

        for (int i = 0; i < webAsyncConfig.getPools().size() ; i++) {

            ThreadPoolConfig threadPoolConfig = webAsyncConfig.getPools().get(i);
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadPoolConfig.getCorePoolSize()
                    , threadPoolConfig.getMaxPoolSize()
                    , threadPoolConfig.getKeepAliveTime(), TimeUnit.SECONDS
                    , new ArrayBlockingQueue<Runnable>(threadPoolConfig.getQueueSize())
                    , new ThreadFactoryBuilder().setNameFormat("webasync-pool-"+ threadPoolConfig.getName() +"-%d").build());
            WebAsyncPoolManager.put(i,threadPoolExecutor);
        }
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }
}

基于DeferredResult实现Http长轮询拉取配置和微信扫码登录

Http长轮询监听配置的流程:

  • client端发送http请求到server端
  • server端hold住http请求连接,并设置超时
  • 超时之内,有变化立即发送client端,否则,直到超时断开连接,继续重复上面步骤

关键在于:server端怎么hold住http请求?变化后如何找到对应的请求?完成和超时后如何处理?

server端怎么hold住http请求? 用Map存放请求(DeferredResult),key:唯一id,val:DeferredResult

变化后如何找到对应的请求? 通过唯一id来找到对应的请求

完成和超时后如何处理?需要将DeferredResult从Map中移除

具体代码:

定义的Map容器:

 /**
     *  map容器
     */
private static final ConcurrentHashMap<String, CopyOnWriteArrayList<DeferredResult>> deferredResultMultimaps =
            new ConcurrentHashMap<>();

请求设置超时时间、完成事件和超时事件,并添加到Map中

//设置超时时间
final DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(DefaultDeferredResultProcessor.CONNECTION_TIMEOUT
                                                                    ,defaultDeferredResultProcessor.getNotModifiedResponse());
//当配置有变动时,直接响应客户端
        if(modifyConfigResult != null && HttpStatus.OK.equals(modifyConfigResult.getStatusCode())
                && StringUtils.hasLength(modifyConfigResult.getBody())){
            result.setResult(modifyConfigResult);
        }
        else{
            //设置长轮询超时回调方法,记录超时请求参数
            result.onTimeout(new Runnable() {
                @Override
                public void run() {
                    for (int i=0; i< configKeyList.size();i++){
                        ConfigInfoVO configKey = configKeyList.get(i);
                        String key = makeConfigKey(configKey.getDataId(),configKey.getGroup());
                        defaultDeferredResultProcessor.removeDeferredResult(key,result);
                    }
                }
            });
            //设置长轮询完成回调方法,从deferredResults集合中剔除相应的异步返回对象
            result.onCompletion(new Runnable() {
                @Override
                public void run() {
                    for (int i=0; i< configKeyList.size();i++){
                        ConfigInfoVO configKey = configKeyList.get(i);
                        String key = makeConfigKey(configKey.getDataId(),configKey.getGroup());
                        defaultDeferredResultProcessor.removeDeferredResult(key,result);
                    }
                }
            });

            for (int i=0; i< configKeyList.size();i++){
                ConfigInfoVO configKey = configKeyList.get(i);
                String key = makeConfigKey(configKey.getDataId(),configKey.getGroup());
              //添加到Map容器
                defaultDeferredResultProcessor.addDeferredResult(key,result);
            }

配置有变化,立即响应client端

public <T> void notifyDeferredResult(String key, final T result) {
        //根据key:dataId/group获取当前所有的异步处理对象集合
        CopyOnWriteArrayList<DeferredResult> currentDeferredResultList = deferredResultMultimaps.get(key);

        if (null == currentDeferredResultList || currentDeferredResultList.size() <= 0){
            LoggerUtil.info(getClass(),"notifyDeferredResult have not DeferredResults key:{0},result:{1}.",key,JSON.toJSONString(result));
            return;
        }

        final CopyOnWriteArrayList<DeferredResult> lstDeferredResult = Lists.newCopyOnWriteArrayList(currentDeferredResultList);

        //当待处理的异步返回对象超过一定批次后,会有线程池来异步处理
        if(lstDeferredResult.size() > CHANGE_CONFIG_BATCH_SIZE){
            changeConfigBatchExecutorService.submit(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < lstDeferredResult.size(); i++){
                        //当超过批次后,休息一段时间
                        if (i > 0 && i % CHANGE_CONFIG_BATCH_SIZE==0){
                            try {
                                TimeUnit.MILLISECONDS.sleep(CHANGE_CONFIG_BATCH_INTERVALTIME);
                            } catch (InterruptedException e) {
                                //ignore
                            }
                        }
                        lstDeferredResult.get(i).setResult(new ResponseEntity<T>(result, HttpStatus.OK));
                    }
                }
            });
        }

       for (int i = 0; i < lstDeferredResult.size(); i++){
            lstDeferredResult.get(i).setResult(new ResponseEntity<T>(result,HttpStatus.OK));
        }
    }

隔离术相关拓展

  • web多环境部署配置隔离
  • Nacos配置命名空间隔离
  • Docker系统资源隔离
  • 账户角色权限隔

相关阅读:

架构篇-一分钟掌握高可用架构

架构篇-一分钟掌握可伸缩架构

架构篇-一分钟掌握可扩展架构

架构篇-一分钟掌握性能优化小技巧

架构-一分钟掌握架构师职业规划

本文来自陌颜投稿,不代表胡巴网立场,如若转载,请注明出处:https://www.hu85.com/171373.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 xxxxx@qq.com 举报,一经查实,本站将立刻删除。