1.服务端返回Chunk
声明,笔者使用的是netty3,netty 3,语法大同小异,主要学习一个思想
1.RestServer.java
@Override
public void run() {
ExecutorService boss = null;
ExecutorService worker = null;
NioServerSocketChannelFactory nioServerSocketChannelFactory = null;
ServerBootstrap bootstrap = null;
try {
boss = Executors.newCachedThreadPool();
worker = Executors.newCachedThreadPool();
nioServerSocketChannelFactory = new NioServerSocketChannelFactory(boss, worker);
bootstrap = new ServerBootstrap(nioServerSocketChannelFactory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new HttpRequestDecoder(),
new HttpChunkAggregator(1024 * 1024*10),
//new HttpContentDecompressor(),
new HttpResponseEncoder(),
//new HttpContentCompressor(),
new ServerHandler()
);
}
});
bootstrap.setoption("child.tcpNoDelay", true);
bootstrap.setoption("child.keepAlive", true);
Channel bind = bootstrap.bind(new InetSocketAddress(9877));
bind.getCloseFuture().awaitUninterruptibly();
//启动服务?
} catch (Exception e) {
LOGGER.error(e.getMessage());
} finally {
LOGGER.info("bootstrap release");
if (bootstrap != null) {
bootstrap.releaseExternalResources();
}
LOGGER.info("bootstrap release");
if (nioServerSocketChannelFactory != null) {
nioServerSocketChannelFactory.releaseExternalResources();
}
LOGGER.info("worker release");
if (worker != null) {
worker.shutdown();
}
LOGGER.info("boss release");
if (boss != null) {
boss.shutdown();
}
}
}
笔者一开始就是被ChunkedWriteHandler
给搞糊涂了,一直以为需要使用ChunkedWriteHandler,直到看到这一段,才知道,这个是对ChunkedFile进行转化的。跟Http没有关系!!!
ChannelPipeline p = ...;
p.addLast("streamer", new ChunkedWriteHandler());
p.addLast("handler", new MyHandler());
Channel ch = ...;
ch.write(new ChunkedFile(new File("video.mkv"));
真正的Chunk是需要自己组装的
1.2 ServerHandler
public class ServerHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object message = e.getMessage();
System.out.println(message);
if (message instanceof DefaultHttpRequest) {
DefaultHttpRequest request = (DefaultHttpRequest) message;
String uri = request.getUri();
System.out.println(uri);
ChannelBuffer content = request.getContent();
System.out.println("read content ");
StringBuilder stringBuilder = new StringBuilder();
byte[] array = content.array();
stringBuilder.append(new String(array,"utf-8"));
System.out.println(stringBuilder.toString());
/*关键代码是下面这一部分*/
DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setChunked(true);
response.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
response.setHeader(HttpHeaders.Names.CONTENT_TYPE,"text/plain;charset=utf-8");
Channels.write(ctx.getChannel(), response);
String str = "这个是Chunk 数据";
//
HttpChunk chunk = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(str.toString().getBytes(CharsetUtil.UTF_8)));
Channels.write(ctx.getChannel(),chunk);
Channels.write(ctx.getChannel(),chunk);
Channels.write(ctx.getChannel(),chunk);
Channels.write(ctx.getChannel(),chunk);
Channels.write(ctx.getChannel(),chunk);
Channels.write(ctx.getChannel(),chunk);
chunk = new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER);
ChannelFuture write = ctx.getChannel().write(chunk);
}
if (message instanceof DefaultHttpChunk){
DefaultHttpChunk chuk=((DefaultHttpChunk) message);
System.out.println(chuk.isLast());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
}
}
需要注意的是chunk与gzip 压缩存在着冲突的关系,实际上,gzip压缩是针对每一个的Chunk请求进行压缩后发送到客户端,但是客户端是把整个Chunk整合之后,再使用gzip 进行解压缩,故而会导致乱码的问题。所以实际上是,如果使用Chunk 就别使用gzip 压缩
1.3客户端发送Chunk到服务端
public static void main(String args[]) {
ExecutorService boss = null;
ExecutorService worker = null;
NioClientSocketChannelFactory nioClientSocketChannelFactory = null;
ClientBootstrap bootstrap = null;
boss = Executors.newCachedThreadPool();
worker = Executors.newCachedThreadPool();
nioClientSocketChannelFactory = new NioClientSocketChannelFactory(boss, worker);
bootstrap = new ClientBootstrap(nioClientSocketChannelFactory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new HttpResponseDecoder(),
new HttpContentDecompressor(),
// new HttpChunkAggregator(65536),
new HttpRequestEncoder(),
new RestClient()
);
}
});
ChannelFuture connect = bootstrap.connect(new InetSocketAddress(9877));
connect.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("write");
Channel channel = future.getChannel();
DefaultHttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/home");
httpRequest.setChunked(true);
httpRequest.setHeader(HttpHeaders.Names.ACCEPT_ENCODING,"gzip,deflate,br");
httpRequest.setHeader(HttpHeaders.Names.CONTENT_TYPE,"text/plain;charset=utf-8");
httpRequest.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
Channels.write(channel,httpRequest);
String str = "这个是Chunk 数据";
HttpChunk chunk = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(str.toString().getBytes(CharsetUtil.UTF_8)));
Channels.write(channel,chunk);
Channels.write(channel,chunk);
Channels.write(channel,chunk);
chunk = new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER);
Channels.write(channel,chunk);
}
});
ChannelFuture closeFuture = connect.getChannel().getCloseFuture();
closeFuture.awaitUninterruptibly();
System.out.println("close ");
bootstrap.releaseExternalResources();
nioClientSocketChannelFactory.releaseExternalResources();
worker.shutdown();
boss.shutdown();
//启动服务?
}
后续发现了Decompressor()其实应该放在Aggregator之前的,因为如果包是一个一个压缩的化,那么就得一个一个解压,而不应该改先汇总后再整体解压,HttpContentCompressor(),移动到前面,主要是因为里面存在设置Http头的部分代码,所以一般建议不要放太后面,避免写入响应的时候,因为缺乏头而报错的情况
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new HttpRequestDecoder(),
new HttpContentDecompressor(),//内容必须是基于Request转码之后解压缩与压缩?
new HttpContentCompressor(),
new HttpChunkAggregator(1024 * 1024 * 10),//报文最大10M,Aggreegate
new HttpResponseEncoder(),
new ServerHandler()
);
}
Netty 客户端实现HTTP Post文件上传
multipart/form-data
附件上传一般是需要借助HttpPostRequestEncoder
帮我们实现,特别需要注意的是HttpObjectAggregator,这一段是必须要的
@Test
public void testClientSendMultipart() throws Exception {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("httpe", new HttpClientCodec());
pipeline.addLast("chunk", new ChunkedWriteHandler());
//特别注意,需要HttpObjectAggregator
pipeline.addLast("aggregate",new HttpObjectAggregator(6553600));
pipeline.addLast(new SimpleChannelInboundHandler<HttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) throws Exception {
if (msg instanceof DefaultFullHttpResponse) {
DefaultFullHttpResponse response = (DefaultFullHttpResponse) msg;
System.out.println(response);
}
}
});
}
});
ChannelFuture connect = bootstrap.connect(new InetSocketAddress(9877)).sync();
connect.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/oldService.do");
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(req, true);
encoder.addBodyAttribute("username", "blueboz");
MemoryFileUpload body = new MemoryFileUpload("data", "test.txt", "text/plain", null, null, 36);
body.setContent(Unpooled.copiedBuffer("this is the content of test.txt file", StandardCharsets.UTF_8));
encoder.addBodyHttpData(body);
//发送finalizeRequest
channel.writeAndFlush(encoder.finalizeRequest());
//直接将encoder直接发送
if (encoder.isChunked()) {
channel.writeAndFlush(encoder).sync();
}
}
});
connect.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
}
以下是服务端接收到的报文,可以看到报文已经按照form表单的格式打印了出来,而且服务端也接收到了报文
POST /oldService.do HTTP/1.1
content-type: multipart/form-data; boundary=6a953d931dd100c1
Content-Length: 349
--6a953d931dd100c1
content-disposition: form-data; name="username"
content-length: 7
content-type: text/plain; charset=UTF-8
blueboz
--6a953d931dd100c1
content-disposition: form-data; name="data"; filename="test.txt"
content-length: 36
content-type: text/plain; charset=UTF-8
this is the content of test.txt file
--6a953d931dd100c1--
DefaultHttpRequest(chunked: false)
POST /oldService.do HTTP/1.1
content-type: multipart/form-data; boundary=4f778495a75b702b
Content-Length: 349
--4f778495a75b702b
content-disposition: form-data; name="username"
content-length: 7
content-type: text/plain; charset=UTF-8
blueboz
--4f778495a75b702b
content-disposition: form-data; name="data"; filename="test.txt"
content-length: 36
content-type: text/plain; charset=UTF-8
this is the content of test.txt file
--4f778495a75b702b--
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(req, true);
//添加内存文件
MemoryFileUpload body =
new MemoryFileUpload("data", "test.txt", "text/plain", null, null, 4);
body.setContent(Unpooled.copiedBuffer("test", StandardCharsets.UTF_8));
encoder.addBodyHttpData(body);
另外,也支持下面这种方式进行报文的发送
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/oldService.do");
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(req, true);
encoder.addBodyAttribute("username", "blueboz");
MemoryFileUpload body = new MemoryFileUpload("data", "test.txt", "text/plain", null, null, 36);
body.setContent(Unpooled.copiedBuffer("this is the content of test.txt file", StandardCharsets.UTF_8));
encoder.addBodyHttpData(body);
channel.writeAndFlush(encoder.finalizeRequest());
while (!encoder.isEndOfinput()) {
ByteBuffer content = encoder.readChunk(PooledByteBufAllocator.DEFAULT).content().nioBuffer();
DefaultHttpContent httpCnt = new DefaultHttpContent(Unpooled.wrappedBuffer(content));
channel.writeAndFlush(httpCnt);
}
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
}
Netty Http服务端接收并处理上传附件
@Test
public void testMultipartServer() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("dec", new HttpServerCodec());
socketChannel.pipeline().addLast("aggregator", new HttpObjectAggregator(655330000));
socketChannel.pipeline().addLast("buss", new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest request) throws Exception {
HttpDataFactory factory = new DefaultHttpDataFactory(false);
HttpPostRequestDecoder requestDecoder = new HttpPostRequestDecoder(factory, request);
List<InterfaceHttpData> datas = requestDecoder.getBodyHttpDatas();
for (InterfaceHttpData data : datas) {
System.out.println(data.getName());
if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
FileUpload fup = (FileUpload) data;
System.out.println(fup.content().toString(StandardCharsets.UTF_8));
} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
Attribute att = (Attribute) data;
System.out.println(att.getValue());
}
}
DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
resp.content().writeBytes("Hello world this is netty response string".getBytes(StandardCharsets.UTF_8));
resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, resp.content().readableBytes());
channelHandlerContext.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
resp.content().writeBytes("Hello world this is netty response string".getBytes(StandardCharsets.UTF_8));
resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, resp.content().readableBytes());
ctx.channel().writeAndFlush(resp);
ctx.channel().close();
}
});
}
});
ChannelFuture bind = serverBootstrap.bind(11344).sync();
bind.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}