`
strongant
  • 浏览: 65007 次
  • 性别: Icon_minigender_1
  • 来自: IT
社区版块
存档分类
最新评论

Ajax+Servlet3实现异步短连接消息推送

阅读更多

1. 实现原理

当主页加载完毕时候,一个ajax(jquery)请求到servlet端, servlet启动异步处理上下文AsyncContext,然后请求一直等待,直到上下文AsyncContext调用complete()方法,或者这个请求timeout,这个请求才会返回到UI

,这样一次连接就结束。请求返回到UI后,ajax马上又发送连接请求到Servlet,这样反反复复进行这个操作

 

2. 实现代码

Servlet端实现

 

//需要导入该类包名
import java.io.IOException;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

@WebServlet(urlPatterns = { "/servlet/asyn" }, asyncSupported = true)
public class AsynServlet extends HttpServlet{
	
	private static final long serialVersionUID = 1L;
	
	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		response.setCharacterEncoding("UTF-8");
		String timeoutStr = request.getParameter("timeout");
		long timeout;
		if (StringUtils.isNumeric(timeoutStr)) {
			timeout = Long.parseLong(timeoutStr);
		} else {
			// 设置10分钟
			timeout = 10 * 60 * 1000;
		}
		
		final HttpServletResponse finalResponse = response;
		final AsyncContext ac = request.startAsync(request, finalResponse);
		// 设置成长久链接
		ac.setTimeout(timeout);
		ac.addListener(new AsyncListener() {
			public void onComplete(AsyncEvent event) throws IOException {
				log.info("onComplete Event!");
				
				ServletService.getInstance().removeAsyncContext(ac);
			}

			public void onTimeout(AsyncEvent event) throws IOException {
				log.info("onTimeout Event!");
				
				ServletService.getInstance().removeAsyncContext(ac);
				ac.complete();
			}
			public void onError(AsyncEvent event) throws IOException {
				
				ServletService.getInstance().removeAsyncContext(ac);
				ac.complete();
			}

			public void onStartAsync(AsyncEvent event) throws IOException {
				log.info("onStartAsync Event!");
			}
		});

		ServletService.getInstance().addAsyncContext(ac);
	}
}
发消息业务方法
//需要导入该类包名

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ServletService {

	
	//异步Servlet上下文队列.
	private final Map<Integer, AsyncContext> ASYNC_CONTEXT_MAP = new ConcurrentHashMap<Integer, AsyncContext>();

	//消息队列.
	private final BlockingQueue<TextMessage> TEXT_MESSAGE_QUEUE = new LinkedBlockingQueue<TextMessage>();

	//单一实例.
	private static ServletService instance = new ServletService();

	//构造函数,创建发送消息的异步线程.
	private ServletService() {
		new Thread(this.notifierRunnable).start();//线程发发消息给多个用户
	}

	//单一实例.
	public static ServletService getInstance() {
		return instance;
	}

	/**
	 * 
	 * 注册异步Servlet上下文.
	 * 
	 * @param asyncContext
	 *            异步Servlet上下文.
	 */
	public void addAsyncContext(final AsyncContext asyncContext) {
		HttpServletRequest req = (HttpServletRequest) asyncContext.getRequest();
		User user = (User) req.getSession().getAttribute("loginuser");
		if (null!=user) {
			ASYNC_CONTEXT_MAP.put(user.getId(), asyncContext);
		}
	}

	/**
	 * 
	 * 删除异步Servlet上下文.
	 * 
	 * @param asyncContext
	 *            异步Servlet上下文.
	 */
	public void removeAsyncContext(final AsyncContext asyncContext) {

		HttpServletRequest req = (HttpServletRequest) asyncContext.getRequest();
		User user = (User) req.getSession().getAttribute("loginuser");
		if (null!=user) {
			ASYNC_CONTEXT_MAP.remove(user.getId());
		}

	}

	/**
	 * 
	 * 发送消息到异步线程,最终输出到http response 流 .
	 * 
	 * @param text 发送给客户端的消息.
	 * 
	 */
	public void putMessage(final int userId, final String text) {

		try {
			TextMessage tm = new TextMessage(userId, text);
			TEXT_MESSAGE_QUEUE.add(tm);

		} catch (Exception ex) {
			throw new RuntimeException(ex);
		}

	}
	
	public void putMessage(final TextMessage tm) {
		try {
			TEXT_MESSAGE_QUEUE.add(tm);
		} 
		catch (Exception ex) {
			throw new RuntimeException(ex);
		}
	}
	
	public boolean pushMessage(final TextMessage tm) {
		boolean result = false;
		AsyncContext ac = ASYNC_CONTEXT_MAP.get(tm.getUserId());
		try {
			if (null != ac) {
				write(ac, tm.getText());
				result = true;
			}
		} catch (Exception e) {
			ASYNC_CONTEXT_MAP.remove(tm.getUserId());
			log.info(e);
		}
		
		return result;
	}

	/**
	 * 
	 * 异步线程,当消息队列中被放入数据,将释放take方法的阻塞,将数据发送到http response流上.
	 * 该方法暂时没用,用于并发测试
	 */
	private Runnable notifierRunnable = new Runnable() {

		public void run() {

			boolean done = false;
			while (!done) {
				try {
					final TextMessage tm = TEXT_MESSAGE_QUEUE.take();//当消息队列没有数据时候,线程执行到这里就会被阻塞
					if (tm.getUserId()==0) {//发送给所有人
						for (Entry<Integer, AsyncContext> entry : ASYNC_CONTEXT_MAP.entrySet()) {
							try {
								write(entry.getValue(), tm.getText());
							} catch (IOException ex) {
								log.info(ex);
							}
						}
					}
					else {
						pushMessage(tm);
					}
					
					Thread.sleep(100);//暂停100ms,停止的这段时间让用户有足够时间连接到服务器
					
				} catch (InterruptedException iex) {
					done = true;
					log.info(iex);
				}
				
			}
		}
	};
	
	private void write(AsyncContext ac, String text) throws IOException {
		PrintWriter acWriter = ac.getResponse().getWriter();

		acWriter.write(text);

		acWriter.flush();
		
		acWriter.close();
		
		ac.complete();
		
	}

}
 发消息实体对象
public final class TextMessage {

	private final int userId;
	private final String text;
	
	public TextMessage(final int userId, final String text) {
		super();
		this.userId = userId;
		this.text = text;
	}
	public int getUserId() {
		return userId;
	}
	public String getText() {
		return text;
	}
}
 

3. 前端js实现(重点)

主页要引入下面脚本代码,当主页加载完毕后,就会发送ajax请求到servet/asyn.do,然后就建立连接等待servlet返回消息到主页,返回到主页调用showMessage,这个方法调用ext组件弹出消息内容

initServlet();
function initServlet() {
	$.ajax({
		url:'servlet/asyn',
		type:'get',
		dataType:'text',
		contentType: "text/plain; charset=UTF-8", 
		timeout:600000,
		success: function(msg){
			if (msg!=null && msg!='') {//这个返回值要改成json对象
				showMessage(msg);
			}
			callSelf();//把这个方法里面的代码放在这里,IE8会一直死循环的调用(其它浏览器不会),不知道什么原因
		}
	});
}
function callSelf() {
	initServlet();
}
function showMessage(data) {
	Ext.create('widget.uxNotification', {
		title: '消息提醒',
		position: 'br',
		cls: 'ux-notification-light',
		iconCls: 'ux-notification-icon-information',
		html: data,
		width: 200,
		height: 90,
		autoCloseDelay: 4000,
		slideBackDuration: 500,
		slideInAnimation: 'bounceOut',
		slideBackAnimation: 'easeIn'
	}).show();
}
//js获取项目根路径,如: localhost:8080/test
function getRootPath(){  
    //获取当前网址,如: http://localhost:8080/test/test.jsp
    var curWwwPath=window.document.location.href;  
    //获取主机地址之后的目录,如: test/test.jsp
    var pathName=window.document.location.pathname;  
    var pos=curWwwPath.indexOf(pathName);  
    //获取主机地址,如: http://localhost:8080
    var localhostPaht=curWwwPath.substring(0,pos);
    var wsPath = localhostPaht.replace('http://','');
    //获取带"/"的项目名,如:/test
    var projectName=pathName.substring(0,pathName.substr(1).indexOf('/')+1);  
    return(wsPath+projectName);  
}  

 

前端要引用ext右下角弹出消息组件:

Notification extension for Ext JS 4.0.2+

Version: 2.1.3

从这里下载

https://github.com/EirikLorentsen/Ext.ux.window.Notification

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics