【Go】使用压缩文件优化io (一)

原文连接:https://blog.thinkeridea.com/201906/go/compress_file_io_optimization1.html

最近遇到一个日志备份 io 过高的问题,业务日志每十分钟备份一次,本来是用 Python 写一个根据规则扫描备份日志问题不大,但是随着业务越来越多,单机上的日志文件越来越大,文件数量也越来越多,导致每每备份的瞬间 io 阻塞严重, CPU 和 load 异常的高,好在备份速度很快,对业务影响不是很大,这个问题会随着业务增长,越来越明显,这段时间抽空对备份方式做了优化,效果十分显著,整理篇文章记录一下。

背景说明

服务器配置:4 核 8G; 磁盘:500G

每十分钟需要上传:18 个文件,高峰时期约 10 G 左右

业务日志为了保证可靠性,会先写入磁盘文件,每10分钟切分日志文件,然后在下十分钟第一分时备份日志到 OSS,数据分析服务会从在备份完成后拉取日志进行分析,日志备份需要高效快速,在最短的时间内备份完,一般备份均能在几十秒内完成。

备份的速度和效率并不是问题,足够的快,但是在备份时 io 阻塞严重导致的 CPU 和 load 异常,成为业务服务的瓶颈,在高峰期业务服务仅消耗一半的系统资源,但是备份时 CPU 经常 100%,且 iowait 可以达到 70 多,空闲资源非常少,这样随着业务扩展,日志备份虽然时间很短,却成为了系统的瓶颈。

后文中会详细描述优化前后的方案,并用 go 编写测试,使用一台 2 核4G的服务器进行测试,测试数据集大小为:

  • 文件数:336
  • 原始文件:96G
  • 压缩文件:24G
  • 压缩方案:lzo
  • Goroutine 数量:4

优化前

优化前日志备份流程:

  • 根据备份规则扫描需要备份的文件
  • 使用 lzop 命令压缩日志
  • 上传压缩后的日志到 OSS

下面是代码实现,这里不再包含备份文件规则,仅演示压缩上传逻辑部分,程序接受文件列表,并对文件列表压缩上传至 OSS 中。

.../pkg/aliyun_oss 是我自己封装的基于阿里云 OSS 操作的包,这个路径是错误的,仅做演示,想运行下面的代码,OSS 交互这部分需要自己实现。

package main

import (
	"bytes"
	"fmt"
	"os"
	"os/exec"
	"path/filepath"
	"sync"
	"time"

	".../pkg/aliyun_oss"
)

func main() {
	var oss *aliyun_oss.AliyunOSS
	files := os.Args[1:]
	if len(files) < 1 {
		fmt.Println("请输入要上传的文件")
		os.Exit(1)
	}

	fmt.Printf("待备份文件数量:%d\n",len(files))

	startTime := time.Now()
	defer func(startTime time.Time) {
		fmt.Printf("共耗时:%s\n",time.Now().Sub(startTime).String())
	}(startTime)

	var wg sync.WaitGroup
	n := 4
	c := make(chan string)

	// 压缩日志
	wg.Add(n)
	for i := 0; i < n; i++ {
		go func() {
			defer wg.Done()
			for file := range c {
				cmd := exec.Command("lzop",file)
				cmd.Stderr = &bytes.Buffer{}
				err := cmd.Run()
				if err != nil {
					panic(cmd.Stderr.(*bytes.Buffer).String())
				}
			}
		}()
	}

	for _,file := range files {
		c <- file
	}

	close(c)
	wg.Wait()
	fmt.Printf("压缩耗时:%s\n",time.Now().Sub(startTime).String())

	// 上传压缩日志
	startTime = time.Now()
	c = make(chan string)
	wg.Add(n)
	for i := 0; i < n; i++ {
		go func() {
			defer wg.Done()
			for file := range c {
				name := filepath.Base(file)
				err := oss.PutObjectFromFile("tmp/"+name+".lzo",file+".lzo")
				if err != nil {
					panic(err)
				}
			}
		}()
	}

	for _,file := range files {
		c <- file
	}

	close(c)
	wg.Wait()
	fmt.Printf("上传耗时:%s\n",time.Now().Sub(startTime).String())
}

程序运行时输出:

待备份文件数量:336
压缩耗时:19m44.125314226s
上传耗时:6m14.929371103s
共耗时:25m59.118002969s

从运行结果中可以看出压缩文件耗时很久,实际通过 iostat 命令分析也发现,压缩时资源消耗比较高,下面是 iostat -m -x 5 10000 命令采集各个阶段数据。

  • 程序运行前
avg-cpu:  %user   %nice %system %iowait  %steal   %idle
           2.35    0.00    2.86    0.00    0.00   94.79

Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await r_await w_await  svctm  %util
vda               0.00     0.00    0.00    0.00     0.00     0.00     0.00     0.00    0.00    0.00    0.00   0.00   0.00
vdb               0.00     0.60    0.00    0.60     0.00     4.80    16.00     0.00    0.67    0.00    0.67   0.67   0.04
  • 压缩日志时
avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          10.84    0.00    6.85   80.88    0.00    1.43

Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await r_await w_await  svctm  %util
vda               0.00     0.00    0.60    0.00     2.40     0.00     8.00     0.00    0.67    0.67    0.00   0.67   0.04
vdb              14.80  5113.80 1087.60   60.60 78123.20 20697.60   172.13   123.17  106.45  106.26  109.87   0.87 100.00

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          10.06    0.00    7.19   79.06    0.00    3.70

Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await r_await w_await  svctm  %util
vda               0.00     0.00    1.60    0.00   103.20     0.00   129.00     0.01    3.62    3.62    0.00   0.50   0.08
vdb              14.20  4981.20  992.80   52.60 79682.40 20135.20   190.97   120.34  112.19  110.60  142.17   0.96 100.00
  • 上传日志时
avg-cpu:  %user   %nice %system %iowait  %steal   %idle
           6.98    0.00    7.81    7.71    0.00   77.50

Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await r_await w_await  svctm  %util
vda               0.00     0.00   13.40    0.00   242.40     0.00    36.18     0.02    1.63    1.63    0.00   0.19   0.26
vdb               0.40     2.40  269.60    1.20 67184.80    14.40   496.30     4.58   15.70   15.77    0.33   1.39  37.74

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
           7.06    0.00    8.00    4.57    0.00   80.37

Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await r_await w_await  svctm  %util
vda               0.00     0.00    0.60    0.00    75.20     0.00   250.67     0.00    2.67    2.67    0.00   2.00   0.12
vdb               0.20     0.00  344.80    0.00 65398.40     0.00   379.34     5.66   16.42   16.42    0.00   1.27  43.66

iostat 的结果中发现,压缩时程序 r_awaitw_await 都到了一百多,且 iowait 高达 80.88%,几乎耗尽了所有的 CPU,上传时 iowait 是可以接受的,因为只是单纯的读取压缩文件,且压缩文件也很小。

分析问题

上述结果中发现程序主要运行消耗在压缩日志,那优化也着重日志压缩的逻辑上。

压缩时日志会先压缩成 lzo 文件,然后再上传 lzo 文件到阿里云 OSS 上,这中间发生了几个过程:

  • 读取原始日志文件
  • 压缩数据
  • 写入 lzo 文件
  • 读取 lzo 文件
  • http 发送读取的内容

压缩时 r_awaitw_await 都很高,主要发生在读取原始日志文件,写入 lzo 文件, 怎么优化呢?

先想一下原始需求,读取原始文件 -> 上传数据。但是直接上传原始文件,文件比较大,网络传输慢,而且存储费用也比较高,怎么办呢?

这个时候我们期望可以上传的是压缩文件,所以就有了优化前的逻辑,这里面产生了一个中间过程,即使用 lzop 命令压缩文件,而且产生了一个中间文件 lzo 文件。

读取原始文件和上传数据是必须的,那么可以优化的就是压缩的流程了,所以 r_await 是没有办法优化的,那么只能优化 w_awaitw_await 是怎么产生的呢,恰恰是写入lzo 时产生的,可以不要 lzo 文件吗?这个文件有什么作用?

如果我们压缩文件数据流,在 读取原始文件 -> 上传数据 流程中对上传的数据流进行实时压缩,把压缩的内容给上传了,实现边读边压缩,对数据流进行处理,像是一个中间件,这样就不用写 lzo 文件了,那么 w_await 就被完全优化没了。

lzo 文件有什么作用?我想只有在上传失败之后可以节省一次文件压缩的消耗。上传失败的次数多吗?我用阿里云 OSS 好几年了,除了一次内网故障,再也没有遇到过上传失败的文件,我想是不需要这个文件的,而且生成 lzo 文件还需要占用磁盘空间,定时清理等等,增加了资源消耗和维护成本。

优化后

根据之前的分析看一下优化之后备份文件需要哪些过程:

  • 读取原始日志
  • 在内存中压缩数据流
  • http 发送压缩后的内容

这个流程节省了两个步骤,写入 lzo 文件和 读取 lzo 文件,不仅没有 w_await,就连 r_await 也得到了小幅度的优化。

优化方案确定了,可是怎么实现 lzo 对文件流进行压缩呢,去 Github 上找一下看看有没有 lzo 的压缩算法库,发现 github.com/cyberdelia/lzo ,虽然是引用 C 库实现的,但是经典的两个算法(lzo1x_1lzo1x_999)都提供了接口,貌似 Go 可以直接用了也就这一个库了。

发现这个库实现了 io.Readerio.Writer 接口,io.Reader 读取压缩文件流,输出解压缩数据,io.Writer 实现输入原始数据,并写入到输入的 io.Writer

想实现压缩数据流,看来需要使用 io.Writer 接口了,但是这个输入和输出都是 io.Writer,这可为难了,因为我们读取文件获得是 io.Reader,http 接口输入也是 io.Reader,貌似没有可以直接用的接口,没有办法实现了吗,不会我们自已封装一下,下面是封装的 lzo 数据流压缩方法:

package lzo

import (
	"bytes"
	"io"

	"github.com/cyberdelia/lzo"
)

type Reader struct {
	r    io.Reader
	rb   []byte
	buff *bytes.Buffer
	lzo  *lzo.Writer
	err  error
}

func NewReader(r io.Reader) *Reader {
	z := &Reader{
		r:    r,rb:   make([]byte,256*1024),buff: bytes.NewBuffer(make([]byte,256*1024)),}

	z.lzo,_ = lzo.NewWriterLevel(z.buff,lzo.BestSpeed)
	return z
}

func (z *Reader) compress() {
	if z.err != nil {
		return
	}

	var nr,nw int
	nr,z.err = z.r.Read(z.rb)
	if z.err == io.EOF {
		if err := z.lzo.Close(); err != nil {
			z.err = err
		}
	}

	if nr > 0 {
		nw,z.err = z.lzo.Write(z.rb[:nr])
		if z.err == nil && nr != nw {
			z.err = io.ErrShortWrite
		}
	}
}

func (z *Reader) Read(p []byte) (n int,err error) {
	if z.err != nil {
		return 0,z.err
	}

	if z.buff.Len() <= 0 {
		z.compress()
	}

	n,err = z.buff.Read(p)
	if err == io.EOF {
		err = nil
	} else if err != nil {
		z.err = err
	}

	return
}

func (z *Reader) Reset(r io.Reader) {
	z.r = r
	z.buff.Reset()
	z.err = nil
	z.lzo,lzo.BestSpeed)
}

这个库会固定消耗 512k 内存,并不是很大,我们需要创建一个读取 buf 和一个压缩缓冲 buf, 都是256k的大小,实际压缩缓冲的 buf 并不需要 256k,毕竟压缩后数据会比原始数据小,考虑空间并不是很大,直接分配 256k 避免运行时分配。

实现原理当 http 从输入的 io.Reader (实际就是我们上面封装的 lzo 库), 读取数据时,这个库检查压缩缓冲是否为空,为空的情况会从文件读取 256k 数据并压缩输入到压缩缓冲中,然后从压缩缓冲读取数据给 http 的 io.Reader,如果压缩缓冲区有数据就直接从压缩缓冲区读取压缩数据。

这并不是线程安全的,并且固定分配 512k 的缓冲,所以也提供了一个 Reset 方法,来复用这个对象,避免重复分配内存,但是需要保证一个 lzo 对象实例只能被一个 Goroutine 访问, 这可以使用 sync.Pool 来保证,下面的代码我用另一种方法来保证。

package main

import (
	"fmt"
	"os"
	"path/filepath"
	"sync"
	"time"

	".../pkg/aliyun_oss"
	".../pkg/lzo"
)

func main() {
	var oss *aliyun_oss.AliyunOSS
	files := os.Args[1:]
	if len(files) < 1 {
		fmt.Println("请输入要上传的文件")
		os.Exit(1)
	}

	fmt.Printf("待备份文件数量:%d\n",len(files))

	startTime := time.Now()
	defer func() {
		fmt.Printf("共耗时:%s\n",time.Now().Sub(startTime).String())
	}()

	var wg sync.WaitGroup
	n := 4
	c := make(chan string)

	// 压缩日志
	wg.Add(n)
	for i := 0; i < n; i++ {
		go func() {
			defer wg.Done()
			var compress *lzo.Reader

			for file := range c {
				r,err := os.Open(file)
				if err != nil {
					panic(err)
				}

				if compress == nil {
					compress = lzo.NewReader(r)
				} else {
					compress.Reset(r)
				}

				name := filepath.Base(file)
				err = oss.PutObject("tmp/"+name+"1.lzo",compress)
				r.Close()
				if err != nil {
					panic(err)
				}
			}
		}()
	}

	for _,file := range files {
		c <- file
	}

	close(c)
	wg.Wait()
}

程序为每个 Goroutine 分配一个固定的 compress ,当需要压缩文件的时候判断是创建还是重置,来达到复用的效果。

该程序运行输出:

待备份文件数量:336
共耗时 18m20.162441931s

实际耗时比优化前提升了 28%,实际通过 iostat 命令分析也发现,资源消耗也有了明显的改善,下面是 iostat -m -x 5 10000 命令采集各个阶段数据。

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          15.72    0.00    6.58   74.10    0.00    3.60

Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await r_await w_await  svctm  %util
vda               0.00     0.00    0.00    0.00     0.00     0.00     0.00     0.00    0.00    0.00    0.00   0.00   0.00
vdb               3.80     3.40 1374.20    1.20 86484.00    18.40   125.79   121.57   87.24   87.32    1.00   0.73 100.00

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          26.69    0.00    8.42   64.27    0.00    0.62

Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await r_await w_await  svctm  %util
vda               0.00     0.20  426.80    0.80  9084.80     4.00    42.51     2.69    6.29    6.30    1.00   0.63  26.92
vdb               1.80     0.00 1092.60    0.00 72306.40     0.00   132.36   122.06  108.45  108.45    0.00   0.92 100.02

通过 iostat 发现只有 r_awaitw_await 被完全优化,iowait 有明显的改善,运行时间更短了,效率更高了,对 io 产生影响的时间也更短了。

优化期间遇到的问题

首先对找到的 lzo 算法库进行测试,确保压缩和解压缩没有问题,并且和 lzop 命令兼容。

在这期间发现使用压缩的数据比 lzop 压缩数据大了很多,之后阅读了源码实现,并没有发现任何问题,尝试调整缓冲区大小,发现对生成的压缩文件大小有明显改善。

这个发现让我也很为难,究竟多大的缓冲区合适呢,只能去看 lzop 的实现了,发现 lzop 默认压缩块大小为 256k,实际 lzo 算法支持的最大块大小就是 256k,所以实现 lzo 算法包装是创建的是 256k 的缓冲区的,这个缓冲区的大小就是压缩块的大小,大家使用的时候建议不要调整了。

总结

这个方案上线之后,由原来需要近半分钟上传的,改善到大约只有十秒(Go 语言本身效率也有很大帮助),而且 load 有了明显的改善。

优化前每当运行日志备份,CPU 经常爆表,优化后备份时 CPU 增幅 20%,可以从容应对业务扩展问题了。

测试是在一台空闲的机器上进行的,实际生产服务器本身 w_await 会有 20 左右,如果使用固态硬盘,全双工模式,读和写是分离的,那么优化掉 w_await 对业务的帮助是非常大的,不会阻塞业务日志写通道了。

当然我们服务器是高速云盘(机械盘),由于机械盘物理特征只能是半双工,要么读、要么写,所以优化掉 w_await 确实效率会提升很多,但是依然会对业务服务写有影响。

转载:

本文作者: 戚银(thinkeridea

本文链接: https://blog.thinkeridea.com/201906/go/compress_file_io_optimization1.html

版权声明: 本博客所有文章除特别声明外,均采用 CC BY 4.0 CN协议 许可协议。转载请注明出处!

相关文章

Golang的文档和社区资源:为什么它可以帮助开发人员快速上手...
Golang:AI 开发者的实用工具
Golang的标准库:为什么它可以大幅度提高开发效率?
Golang的部署和运维:如何将应用程序部署到生产环境中?
高性能AI开发:Golang的优势所在
本篇文章和大家了解一下go语言开发优雅得关闭协程的方法。有...