前言
阿里云消息服务(Message Service,原MQS)是阿里云商用的消息中间件服务。与传统的消息中间件不同,消息服务一开始就是基于阿里云自主研发的飞天分布式系统来设计和实现,具有大规模,高可靠、高并发访问和超强消息堆积能力的特点。消息服务API采用HTTP RESTful标准,接入方便,跨网络能力强;已全面接入资源访问控制服务(RAM)、专有网络(VPC),支持各种安全访问控制;接入云监控,提供完善的监控及报警机制。消息服务提供丰富的SDK、解决方案、最佳实践和7x24小时的技术支持,帮助应用开发者在应用组件之间自由地传递数据和构建松耦合、分布式、高可用系统。
这里不多介绍,点击查看更多>>>。
下面要说的才是重点,利用java sdk 提供的消息服发送以及接收消息的并发访问。
测试目的
并发情况下消息服(MNS)的QPS
测试工具
aliyun-sdk-mns-1.1.7.jar 阿里云提供的操作消息服的SDK
测试过程-代码
消息服提供三种endpoint 访问,由于我们应用服务器也是部署在阿里云ECS上的,因此选择私有网络(内网)进行访问。
代码如下:
package com.study.mq.aliyunmns;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.SystemUtils;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.model.Message;
/**
-
-
@author wangkai
-
@2016年11月22日 上午11:27:14
-
@desc:阿里云消息服,队列消息发送以及消费的并发测试
*/
public class MnsQueueApp {
private static Logger LOG = Logger.getLogger(MnsQueueApp.class.getName());
private static MNSClient client = null;
private static AtomicLong totalCount = new AtomicLong(0);
private static String endpoint = null;
private static String accessId = null;
private static String accessKey = null;
private static String queueName = "articlepricinglog";
private static int threadNum = 100;
private static int totalSeconds = 180;
private static String log4jConfPath = "./log4j.properties";
static{
PropertyConfigurator.configureAndWatch(log4jConfPath);
}
/**
- 解析配置文件
- @return
*/
@SuppressWarnings("unused")
protected static boolean parseConf() {
String confFilePath = SystemUtils.getUserDir() + SystemUtils.FILE_SEPARATOR + "mns.properties";
BufferedInputStream bis = null;
try {
bis = new BufferedInputStream(new FileInputStream(confFilePath));
if (bis == null) {
LOG.info("ConfFile not opened: " + confFilePath);
return false;
}
} catch (FileNotFoundException e) {
LOG.error("ConfFile not found: " + confFilePath,e);
return false;
}
// load file
Properties properties = new Properties();
try {
properties.load(bis);
} catch(IOException e) {
LOG.error("Load ConfFile Failed: " + e.getMessage());
return false;
} finally {
try {
bis.close();
} catch (Exception e) {
// do nothing
}
}
// init the member parameters
endpoint = properties.getProperty("Endpoint");
LOG.info("Endpoint: " + endpoint);
accessId = properties.getProperty("AccessId");
LOG.info("AccessId: " + accessId);
accessKey = properties.getProperty("AccessKey");
queueName = properties.getProperty("QueueName",queueName);
LOG.info("QueueName: " + queueName);
threadNum = Integer.parseInt(properties.getProperty("ThreadNum",String.valueOf(threadNum)));
LOG.info("ThreadNum: " + threadNum);
totalSeconds = Integer.parseInt(properties.getProperty("TotalSeconds",String.valueOf(totalSeconds)));
LOG.info("TotalSeconds: " + totalSeconds);
return true;
}
/**
-
程序入口
-
@param args
*/
public static void main(String[] args) {
if (!parseConf()) {
return;
}
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setMaxConnections(threadNum);
clientConfiguration.setMaxConnectionsPerRoute(threadNum);
CloudAccount cloudAccount = new CloudAccount(accessId,accessKey,endpoint,clientConfiguration);
client = cloudAccount.getMNSClient();
/CloudQueue queue = client.getQueueRef(queueName);
queue.delete();
QueueMeta meta = new QueueMeta();
meta.setQueueName(queueName);
client.createQueue(meta);/
// 1. Now check the SendMessage
ArrayList threads = new ArrayList();
for (int i = 0; i < threadNum; ++i){
Thread thread = new Thread(new Runnable() {
public void run() {
try {
CloudQueue queue = client.getQueueRef(queueName);
Message message = new Message();
message.setMessageBody("Test");
long count = 0;
long startTime = System.currentTimeMillis();
LOG.info(startTime);
long endTime = startTime + totalSeconds * 1000;
while (true) {
for (int i = 0; i < 50; ++i) {
queue.putMessage(message);
}
count += 50;
// 在指定时间内执行完
if (System.currentTimeMillis() >= endTime) {
break;
}
}
// LOG.info(System.currentTimeMillis());
LOG.info("Thread" + Thread.currentThread().getName() + ": " + String.valueOf(count));
totalCount.addAndGet(count);
} catch (Exception e) {
e.printStackTrace();
}
}
},String.valueOf(i));
thread.start();
threads.add(thread);
}
for (int i = 0; i < threadNum; ++i) {
try {
threads.get(i).join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOG.info("SendMessage requests: " + totalCount.get() +" time: "+ totalSeconds);
LOG.info("SendMessage QPS: " + totalCount.get() / totalSeconds);
// 2. Now is the ReceiveMessage
threads.clear();
totalCount.set(0);
//totalSeconds = totalSeconds / 3; // To ensure that messages in queue are enough for receiving
for (int i = 0; i < threadNum; ++i){
Thread thread = new Thread(new Runnable() {
public void run() {
try {
CloudQueue queue = client.getQueueRef(queueName);
long count = 0;
long endTime = System.currentTimeMillis() + totalSeconds * 1000;
while (true) {
for (int i = 0; i < 50; ++i) {
Message message = queue.popMessage();
//删掉消息
if (message != null)
queue.deleteMessage(message.getReceiptHandle());
}
count += 50;
if (System.currentTimeMillis() >= endTime) {
break;
}
}
LOG.info("Thread" + Thread.currentThread().getName() + ": " + String.valueOf(count));
totalCount.addAndGet(count);
} catch (Exception e) {
e.printStackTrace();
}
}
},String.valueOf(i));
thread.start();
threads.add(thread);
}
// keep looping util the end of world
for (int i = 0; i < threadNum; ++i) {
try {
threads.get(i).join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOG.info("ReceiveMessage requests: " + totalCount.get() +" time: "+ totalSeconds);
LOG.info("ReceiveMessage QPS: " + totalCount.get() / totalSeconds);
return;
}
}
config配置文件
# 用户可指定并发度、运行时间
# 使用发送总请求数除以运行时间得到QPS
# 私网地址
Endpoint=http://xxxxxxxxx.mns.cn-hangzhou-internal.aliyuncs.com/
# 公网地址
#Endpoint=http://1xxxxx/
AccessId=xxxxx
AccessKey=xxxxxx
# 测试队列名
QueueName=articlepricinglog
#是发送/消费的线程数,MNS具备强大的高并发扩展性能
ThreadNum=100
#是测试Case运行的时间 单位(秒)
TotalSeconds=10
打包部署到类生产环境,执行 java -jar aliyunmns.jar
测试结果
发送端
2016-12-01 09:50:53 [ main:91193 ] - [ INFO ] SendMessage requests: 5000 time: 10
2016-12-01 09:50:53 [ main:91193 ] - [ INFO ] SendMessage QPS: 500
接收端
2016-12-01 09:53:22 [ main:240468 ] - [ INFO ] ReceiveMessage requests: 5000 time: 10
2016-12-01 09:53:22 [ main:240468 ] - [ INFO ] ReceiveMessage QPS: 500
// 后面用nodejs 利用其异步区测试消息服
// 改进一下
改进版本
package com.study.mq.aliyunmns;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.apache.commons.lang3.SystemUtils;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.model.Message;
/**
-
-
@author wangkai
-
@2016年11月22日 上午11:27:14
-
@desc:阿里云消息服(MNS),队列消息发送以及消费的并发测试
-
https://www.aliyun.com/product/mns?spm=5176.8142029.388261.80.fNnCkg
*/
public class MnsQueueAppV2 {
private static Logger LOG = Logger.getLogger(MnsQueueAppV2.class.getName());
private static MNSClient client = null;
// private static AtomicLong totalCount = new AtomicLong(0);
private static String endpoint = null;
private static String accessId = null;
private static String accessKey = null;
private static String queueName = "articlepricinglog";
private static int threadNum = 100;
private static int clientNum = 10000;
// private static int totalSeconds = 180;
private static String log4jConfPath = "./log4j.properties";
static {
PropertyConfigurator.configureAndWatch(log4jConfPath);
}
/**
- 解析配置文件
-
- @return
*/
@SuppressWarnings("unused")
protected static boolean parseConf() {
// URL resource =
// MnsQueueAppV2.class.getClassLoader().getResource("name.properties");
String confFilePath = SystemUtils.getUserDir()
- SystemUtils.FILE_SEParaTOR
- "src/main/resources/mns.properties";
URL resource = MnsQueueAppV2.class.getResource("/mns.properties");
URL resource2 = MnsQueueAppV2.class.getClassLoader().getResource("mns.properties");//二者等价
BufferedInputStream bis = null;
try {
bis = new BufferedInputStream(new FileInputStream(confFilePath));
if (bis == null) {
LOG.info("ConfFile not opened: " + confFilePath);
return false;
}
} catch (FileNotFoundException e) {
LOG.error("ConfFile not found: " + confFilePath,e);
return false;
}
// load file
Properties properties = new Properties();
try {
properties.load(bis);
} catch (IOException e) {
LOG.error("Load ConfFile Failed: " + e.getMessage());
return false;
} finally {
try {
bis.close();
} catch (Exception e) {
// do nothing
}
}
// init the member parameters
endpoint = properties.getProperty("Endpoint");
LOG.info("Endpoint: " + endpoint);
accessId = properties.getProperty("AccessId");
LOG.info("AccessId: " + accessId);
accessKey = properties.getProperty("AccessKey");
queueName = properties.getProperty("QueueName",String.valueOf(threadNum)));
LOG.info("ThreadNum: 线程数" + threadNum);
clientNum = Integer.parseInt(properties.getProperty("ClientNum",String.valueOf(clientNum)));
LOG.info("ClientNum: 并发数" + clientNum);
// totalSeconds =
// Integer.parseInt(properties.getProperty("TotalSeconds",// String.valueOf(totalSeconds)));
// LOG.info("TotalSeconds: " + totalSeconds);
return true;
}
/**
-
程序入口
-
-
@param args
-
@throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
// 准备工作
if (!parseConf()) {
return;
}
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setMaxConnections(threadNum);
clientConfiguration.setMaxConnectionsPerRoute(threadNum);
CloudAccount cloudAccount = new CloudAccount(accessId,clientConfiguration);
client = cloudAccount.getMNSClient();
LOG.info("发送消息");
// 线程池
ExecutorService exec = Executors.newFixedThreadPool(500);
/**
-
-
acquire(),然后再获取该许可。每个 release()
-
添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数
-
,并采取相应的行动。拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。
*/
final Semaphore semp = new Semaphore(threadNum);
long startTime = System.currentTimeMillis(); // 开启时间
// 开始
for (int index = 0; index < clientNum; index++) {
// final int NO = index;
Runnable task = new Runnable() {
public void run() {
try {
semp.acquire();
try {
// 获取queue
CloudQueue queue = client.getQueueRef(queueName);
// 组装消息
Message message = new Message();
message.setMessageBody("Test");
// 发送消息
queue.putMessage(message);
} catch (Exception e) {
e.printStackTrace();
}
semp.release();
} catch (Exception e) {
e.printStackTrace();
}
}
};
exec.submit(task);
}
long endTime = System.currentTimeMillis(); // 开启时间
exec.shutdown();
LOG.info(clientNum + " 的并发发送消息总耗时:>>>" + (endTime - startTime) + " ms");
LOG.info(clientNum + " 的并发发送消息 QPS为:>>>" + (clientNum * 1000)
/ (endTime - startTime) + " q/s");
LOG.info("接收消息");
Thread.sleep(3000);
ExecutorService exec2 = Executors.newFixedThreadPool(500);
final Semaphore semp2 = new Semaphore(threadNum);
long startTime2 = System.currentTimeMillis(); // 开启时间
for (int index = 0; index < clientNum; index++) {
// final int NO = index;
Runnable task = new Runnable() {
public void run() {
try {
semp2.acquire();
try {
// 获取queue
CloudQueue queue = client.getQueueRef(queueName);
// 获取消息
Message message = queue.popMessage();
// 删掉消息
if (message != null)
queue.deleteMessage(message.getReceiptHandle());
} catch (Exception e) {
e.printStackTrace();
}
semp2.release();
} catch (Exception e) {
e.printStackTrace();
}
}
};
exec2.submit(task);
}
long endTime2 = System.currentTimeMillis(); // 开启时间
exec2.shutdown();
// 忽略线程切换的耗时 精确的做法?
LOG.info(clientNum + " 的并发接收消息总耗时:>>>" + (endTime2 - startTime2)
- " ms");
LOG.info(clientNum + " 的并发接收消息 QPS为:>>>" + (clientNum * 1000)
/ (endTime2 - startTime2) + " q/s");
}
}
改进后内网测试结果
2016-12-05 22:49:15 [ main:13 ] - [ INFO ] QueueName: tms-message
2016-12-05 22:49:15 [ main:14 ] - [ INFO ] ThreadNum: 线程数10
2016-12-05 22:49:15 [ main:14 ] - [ INFO ] ClientNum: 并发数1000
2016-12-05 22:49:16 [ main:601 ] - [ INFO ] 发送消息
2016-12-05 22:49:16 [ main:749 ] - [ INFO ] 1000 的并发发送消息总耗时:>>>145 ms
2016-12-05 22:49:16 [ main:749 ] - [ INFO ] 1000 的并发发送消息 QPS为:>>>6896 q/s
2016-12-05 22:49:16 [ main:749 ] - [ INFO ] 接收消息
2016-12-05 22:49:19 [ main:3870 ] - [ INFO ] 1000 的并发接收消息总耗时:>>>120 ms
2016-12-05 22:49:19 [ main:3873 ] - [ INFO ] 1000 的并发接收消息 QPS为:>>>8333 q/s