每次想多线程处理一个大的结果集的时候 都需要写一大堆代码,自己写了个工具类 方便使用
package com.guige.fss.common.util;import com.guige.fss.common.exception.BusinessException;import io.swagger.models.auth.In;import lombok.extern.slf4j.Slf4j;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.util.CollectionUtils;import java.util.ArrayList;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;/** * Created by admin on 2018/6/5. * @author 宋安伟 */public class ThreadUtil { //创建定长线程池,初始化线程 private static Logger log = LoggerFactory.getLogger(ThreadUtil.class); /** * 对List进行多线程处理(限制 对List只读 如果想修改List 可以处理完毕后把要修改或删除的List返回 多线程执行完后再修改或删除) * @param list 要处理的List * @param threadSize 用几个线程处理 * @param threadLoadback 处理的回调(具体业务员) * @param每个回调的返回结果 * @param List 的泛型 * @return */ public static List executorsTasks(final List list,final int threadSize,final ThreadLoadback threadLoadback){ // 开始时间 long start = System.currentTimeMillis(); // 总数据条数 int dataSize = list.size(); // 线程数 int threadNum = dataSize / threadSize + 1; // 定义标记,过滤threadNum为整数 boolean special = dataSize % threadSize == 0; // 创建一个线程池 ExecutorService exec = Executors.newFixedThreadPool(threadNum); // 定义一个任务集合 List > tasks = new ArrayList >(); Callable task = null; List cutList = null; for (int i = 0; i < threadNum; i++) { if (i == threadNum - 1) { if (special) { break; } cutList = list.subList(threadSize * i, dataSize); } else { cutList = list.subList(threadSize * i, threadSize * (i + 1)); } // System.out.println("第" + (i + 1) + "组:" + cutList.toString()); final List listStr = cutList; task = new Callable () { @Override public T call() throws Exception { // System.out.println(Thread.currentThread().getName() + "线程:" + listStr); return (T) threadLoadback.load(listStr); // return } }; // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系 tasks.add(task); } List > resultsFuture = null; try { log.debug("线程任务执行开始:任务数"+tasks.size()); resultsFuture = exec.invokeAll(tasks); List results = new ArrayList<>(); for (Future future : resultsFuture) { T result=future.get(); if(result!=null) { results.add(result); } } return results; } catch (Exception e) { e.printStackTrace(); throw new BusinessException(e.getMessage()); }finally { // 关闭线程池 exec.shutdown(); log.debug("线程任务执行结束"); log.debug("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒"); } } interface ThreadLoadback { T load(List list) throws Exception; } public static void main(String[] args) { List list = new ArrayList<>(); for(int i=0;i<1000;i++){ list.add("i="+i); } List
> resultList= ThreadUtil.executorsTasks(list, 10, new ThreadLoadback
, String>() { @Override public List load(List list) throws Exception { List result= new ArrayList<>(); for(String str:list){ str= str.replaceAll("i=",""); result.add(Integer.parseInt(str)); System.out.println(Thread.currentThread().getName()+"休息1秒"); Thread.sleep(1000L); } return result; } }); if(!CollectionUtils.isEmpty(resultList)){ List integers = new ArrayList<>(); resultList.stream().forEach(items -> { if (!CollectionUtils.isEmpty(resultList)) { items.stream().forEach(item -> { integers.add(item); }); } } ); integers.stream().forEach(item->System.out.println(item)); } }}