在程序启动时候间隔几秒或直接启动两个线程,
// 线程池管理
package org.marker.mq;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线程池管理
* @Date 2012-12-29
* */
public class ThreadPoolManager {
//线程实例变量
private static volatile ThreadPoolManager poolManager;
// 线程池维护线程的最少数量
private final static int CORE_POOL_SIZE = 4;
// 线程池维护线程的最大数量
private final static int MAX_POOL_SIZE = 15;
// 线程池维护线程所允许的空闲时间
private final static int KEEP_ALIVE_TIME = 0;
// 线程池所使用的缓冲队列大小
private final static int WORK_QUEUE_SIZE = 15;
// 消息缓冲队列
private Queue<String> msgQueue = new LinkedList<String>();
// 访问消息缓存的调度线程
final Runnable accessBufferThread = new Runnable() {
public void run() {
if (hasMoreAcquire()) {//如果有请求内容,则创建一个新的AccessDBThread,并添加到线程池中
String msg = (String) msgQueue.poll();
Runnable task = new AccessDBThread(msg);
threadPool.execute(task);
}
}
};
//异常处理消息
final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(((AccessDBThread) r).msg + "消息放入队列中重新等待执行");
msgQueue.offer(((AccessDBThread) r).msg);
}
};
//管理数据库访问的线程池
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);
//调度线程池(参数含义查看API)
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate( accessBufferThread, 10, 1, TimeUnit.MILLISECONDS);
//私有构造方法
private ThreadPoolManager() { }
/**
* 创建线程池实例
* @return ThreadPoolManager
*/
public static ThreadPoolManager getInstance() {
if(poolManager == null) poolManager = new ThreadPoolManager();
return poolManager;
}
//判断是否有更多的
private boolean hasMoreAcquire() {
return !msgQueue.isEmpty();
}
/**
* 推送消息到池里
* */
public boolean put(String msg) {
if(msgQueue.size() < 10){
return msgQueue.offer(msg);
}else{
return false;
}
}
}
// 排队对象
package org.marker.mq;
/**
* 数据库操作
* @author marker
* */
public class AccessDBThread implements Runnable {
public String msg;
public AccessDBThread(String msg) {
this.msg = msg;
}
public void run() {
// 向数据库中添加Msg变量值
System.out.println("Added the message: " + msg + " into the Database");
}
}
// 测试代码
package org.marker.mq;
public class TestDriver {
ThreadPoolManager pool = ThreadPoolManager.getInstance();
public void sendMsg(String msg) {
if(!pool.put(msg + "记录一条日志 ")){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.sendMsg(msg);
}
}
public static void main(String[] args) {
for (int i = 0; i < 10000; i++) {
new TestDriver().sendMsg(Integer.toString(i));
}
}
}
在这个并发编程实现中,并没有说理解处理数据库插入,而是交给线程池有时间有优先级的排队处理。