之前写过一篇文章:flex+blazeds+java后台消息推送(简单示例) ,现在要写的是这个的升级版,改动还是挺多的,在上面的基础上增加了spring配置,还有界面的维护。后台基本上全变了。呵呵 。。。下面看实现过程
大体的思路是:服务器启动时,查询数据库获取为推送的消息,存储到内存中,定义了一个全局变量MESSAGE_LIST,集合类型。以后的每次维护,包括(增、删、改),都同时维护两份,及数据库和内存,保证两者的一致。当一条消息推送完成,即从MESSAGE_LIST中删除。
首先,添加spring的配置,在web-application-config.xml文件中添加
1. <flex:message-brokerid="_messagebroker">
2. <flex:message-service
3. default-channels="my-streaming-amf,my-polling-amf"/>
4. </flex:message-broker>
5. <flex:message-destination id="tick-data-Feed"channels="my-streaming-amf,my-polling-amf"allow-subtopics="true" subtopic-separator="."/>
复制代码
没添加之前时:
1. <flex:message-broker />
复制代码
主要是为了在后台获取message-broker,services-config.xml的配置还是一样,不变。
然后,后台java类:
1. /**
2. * 新增消息
4. *@return
5. *@throws ServiceException
6. *@throws sqlException
7. * @author:IT氧吧
8. *@createDate: 2011-5-17
9. *@modifiedBy:IT氧吧
10. *@modifiedDate:2011-5-17
11. */
12. public String savePushMessage(PushMessage paramMessage)
13. throwsServiceException,sqlException {
14.
15. paramMessage.setStartDate(DateUtil.parseDate(paramMessage.getStartDateCN(),DateUtil.YMDHMS_PATTERN));
16. paramMessage.setEndDate(DateUtil.parseDate(paramMessage.getEndDateCN(),DateUtil.YMDHMS_PATTERN));
17. Date d = systemService.getDateFromDataBase();
18. if(paramMessage.getStartDate().compareto(d)< 0 || paramMessage.getEndDate().compareto(d) < 0){
19. thrownew ServiceException("推送日期不能早于当前日期");
20. }
21. if(paramMessage.getStartDate().compareto(paramMessage.getEndDate()) > 0){
22. thrownew ServiceException("推送起始日期不能晚于推送结束日期");
23. }
24. /** 保存到数据库 */
25. Long messageId = systemService.savePushMessage(paramMessage);
26. /** 添加到集合中 */
27. GlobalNames.MESSAGE_LIST.add(paramMessage);
28.
29. /** 集合重新排序降序
30. Comparator<PushMessage> comparator = newComparator<PushMessage>(){
31. @Override
32. public intcompare(PushMessage o1,PushMessage o2) {
33. returno1.getStartDate().compareto(o2.getStartDate());
34. }
35. };
36. Collections.sort(GlobalNames.MESSAGE_LIST,comparator); */
37. /**
38. * 如果GlobalNames.MESSAGE_LIST没有消息,此时新增消息的时候就得推送,否则只是把消息添加到GlobalNames.MESSAGE_LIST并排序
39. * */
40. if(GlobalNames.MESSAGE_LIST.size() == 1){
41. PushMessageUtil.startPush(systemService);
42. }
43. return messageId.toString();
44. }
复制代码
下面看一下PushMessageUtil
1. public static void startPush(SystemService systemService){
2. if(GlobalNames.MESSAGE_LIST.size() > 0){
3. if(t ==null){
4. t = new Timer();
5. try {
6. dbDate =systemService.getDateFromDataBase();
7. } catch (ServiceException e) {
8. // TodoAuto-generated catch block
9. e.printstacktrace();
10. } catch (sqlException e) {
11. // TodoAuto-generated catch block
12. e.printstacktrace();
13. }
14. }
15. /**
16. * 下面一段代码在一种情况下会报错,即系统刚启动还没有任何人登录,
17. * 此时系统flex框架部分还未初始化,获取到msgbroker为空
18. * */
19. msgbroker = Messagebroker.getMessagebroker("_messagebroker");
20. clientID= UUIDUtils.createUUID();
21. t.scheduleAtFixedrate(new TimerTask() {
22. @Override
23. public void run() {
24. // TodoAuto-generated method stub
25. if(msgbroker ==null){
26. msgbroker = Messagebroker.getMessagebroker("_messagebroker");
27. }
28. PushMessage message;
29. dbDate.setMinutes(dbDate.getMinutes()+1);
30. /** 每隔1分钟轮询一次 */
31. /** 注意下面的循环不能用foreach,否则报java.util.ConcurrentModificationException*/
32. // for(PushMessagemessage : GlobalNames.MESSAGE_LIST){
33. for(inti=0;i<GlobalNames.MESSAGE_LIST.size();i++){
34. String dateStr = DateUtil.format(dbDate,DateUtil.YMDHMS_PATTERN);
35. message = GlobalNames.MESSAGE_LIST.get(i);
36. if((message.getStartDate().getMinutes()-dbDate.getMinutes())% message.getInterval() == 0){
37. pMessage = message;
38. start();
39. }
40.
41. //是否等于结束时间
42. if(message.getEndDateCN().substring(0,message.getEndDateCN().lastIndexOf(":")).equals(dateStr.substring(0,dateStr.lastIndexOf(":")))){
43. GlobalNames.MESSAGE_LIST.remove(message);
44. System.out.println("大小:"+GlobalNames.MESSAGE_LIST.size());
45. stopMessage = message;
46. stop();
47. if(GlobalNames.MESSAGE_LIST.size()== 0){
48. t.cancel();
49. t = null;
50. break;
51. }
52. }
53. }
54. }
55. },1000*60,1000*60);
56. }
57. }
58.
59. /**
60. * 开始推送
61. *@author: IT氧吧
62. *@createDate: 2011-5-20
63. *@modifiedBy: IT氧吧
64. *@modifiedDate:2011-5-20
65. */
66. private static void start() {
67. Asyncmessage msg = new Asyncmessage();
68. msg.setDestination("tick-data-Feed");
69. msg.setHeader("DSSubtopic","tick");
70. msg.setClientId(clientID);
71. msg.setMessageId(UUIDUtils.createUUID());
72. msg.setTimestamp(System.currentTimeMillis());
73. msg.setBody(pMessage);
74. if(msgbroker != null){
75. msgbroker.routeMessagetoService(msg,null);
76. System.out.println("start!!");
77. }
78. }
79.
80. /**
81. * 停止推送
82. *@author: IT氧吧
83. *@createDate: 2011-5-20
84. * @modifiedBy:IT氧吧
85. *@modifiedDate:2011-5-20
86. */
87. private static void stop() {
88. System.out.println(stopMessage.getContent()+" 已经推送完成");
89. }
复制代码
不解释,应该都能看懂,该有的注释也都有
前台还是一样,该添加的监听还是得添加,只是多了个,如果有多个消息同时推送时,已经推送过的消息将不会显示
1. private var msgList:ArrayCollection = newArrayCollection();
2. privatefunction messageHandler(event:MessageEvent):void
3. {
4. var pmsg:ServerPushMessageVO = event.message.bodyas ServerPushMessageVO;
5. /**
6. * 返回指示视图是否包含指定对象的信息。与 IViewCursor.findxxx 方法不同,
7. * 此搜索仅在找到完全与参数匹配的项目时才会成功。如果视图中包含已应用到该视图的滤镜,
8. * 则即使基本集合确实包含该项目,此方法也会返回 false。
9. * */
10. if(!msgList.contains(pmsg.id))
11. {
12. var showTipMessageG:ShowTipMessageG= new ShowTipMessageG();
13. msgList.addItem(pmsg.id);
14. showTipMessageG.serverPushMessageVO = pmsg;
15. PopUpEffect.show(showTipMessageG,this,true);
16. }
17. }
复制代码