数据竞赛无法理解

问题描述

动机:我有一个巨大的JSON文件,我打算对其进行解析并对其进行处理。

现在,我敢肯定会有一些库可以做到这一点,但是我想自己做一下,以更好地理解Go的并发构造。

所以我的目标是使用scanner读取文件,然后像这样将数据泵送到[]byte chan上:


    // Not the actual code.
    for scanner.Scan() {
       input <- []byte(scanner.Text())
    }

我要求不止1个例程从input chan接收数据并解组JSON并返回结果(是否成功进行了编组),并显示进度条


    // not the actual code.
     for {
        bytes := <- input
        if err := json.Unmarshal(bytes); err != nil {
          errorchan <- true
        } else {
           successchan <- true
        }
        progress <-  size_of_byte(bytes)
     }
    
      // now have other go-routine to handle errorchan,successchan and progress thing.

所有内容在纸上看起来都是合乎逻辑的,但是当我设法汇编代码(如下所示)时,我看到了数据竞争,我尽力了解该数据竞争是如何发生的,但是没有(因为我删除了一些其他的数据竞争,出现在前面的代码中)

workers 0xc0000c2000
 Completed 0.000000==================
WARNING: DATA RACE
Read at 0x00c0000c2048 by goroutine 8:
  mongo_import/race-d.readFile()
      /Users/admin/Documents/goProject/src/mongo_import/race-d/main.go:197 +0x6ff
  mongo_import/race-d.TestReadJson()
      /Users/admin/Documents/goProject/src/mongo_import/race-d/main_test.go:8 +0x47
  testing.tRunner()
      /usr/local/Cellar/go/1.13.7/libexec/src/testing/testing.go:909 +0x199

Previous write at 0x00c0000c2048 by goroutine 12:
  mongo_import/race-d.(*Worker).trackSuccess()
      /Users/admin/Documents/goProject/src/mongo_import/race-d/main.go:103 +0xc0

Goroutine 8 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13.7/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13.7/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13.7/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13.7/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13.7/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:44 +0x223

Goroutine 12 (running) created at:
  mongo_import/race-d.(*Worker).Start()
      /Users/admin/Documents/goProject/src/mongo_import/race-d/main.go:72 +0x15f
==================
--- FAIL: TestReadJson (1.18s)
    testing.go:853: race detected during execution of test
FAIL
FAIL    mongo_import/race-d 1.192s
FAIL 
  • 测试包中的数据竞赛对我来说是新事物。

  • 但是我无法理解为什么这会导致数据争用(对我来说这毫无意义)

      Previous write at 0x00c0000c2048 by goroutine 12:
    mongo_import/race-d.(*Worker).trackSuccess()
        /Users/admin/Documents/goProject/src/mongo_import/race-d/main.go:103 +0xc0
    
     Goroutine 12 (running) created at:
    mongo_import/race-d.(*Worker).Start()
        /Users/admin/Documents/goProject/src/mongo_import/race-d/main.go:72 +0x15f  
    

代码:

这是代码的样子


    package main
    
    import (
        "bufio"
        "encoding/binary"
        "encoding/json"
        "fmt"
        "log"
        "os"
        "sync"
        "time"
    )
    
    // thread that does that job of unmarshal
    type Thread struct {
        w *Worker
    }
    
    // Run the individual thread and process the bytes
    // read for worter.input chan
    func (thread Thread) Run() {
        for {
            bytes,ok := <-thread.w.input
            if !ok {
                return
            }
    
            var data map[string]interface{}
            if err := json.Unmarshal(bytes,&data); err != nil {
                thread.w.errorChan <- true
            } else {
                thread.w.successChan <- true
            }
    
            thread.w.progress <- int64(binary.Size(bytes))
            // do other thing
            // like insert in db etc.
        }
    }
    
    // worker that
    type Worker struct {
        errmutex      sync.Mutex
        succmutex     sync.Mutex
        progmutex     sync.Mutex
        wg            sync.WaitGroup
        done          bool
        workers       int
        fileSize      int64
        completedByte int64
        errorCount    int
        successCount  int
        input         chan []byte
        progress      chan int64
        errorChan     chan bool
        successChan   chan bool
    }
    
    // NewWorker
    func NewWorker(count int) *Worker {
        return &Worker{workers: count}
    }
    
    // start the worker
    func (w *Worker) Start() {
        fmt.Printf("workers %p\n",w)
        w.wg.Add(1)
        go w.display()
        w.wg.Add(1)
        go w.trackProgress()
        w.wg.Add(1)
        go w.trackSuccess()
        w.wg.Add(1)
        go w.trackError()
        w.wg.Add(1)
        go w.Spawn()
        w.wg.Wait()
    }
    
    // add the error count
    func (w *Worker) trackError() {
        w.wg.Done()
        for {
            _,ok := <-w.errorChan
            if !ok {
                return
            }
            w.errmutex.Lock()
            w.errorCount = w.errorCount + 1
            w.errmutex.Unlock()
        }
    }
    
    // add the success count
    func (w *Worker) trackSuccess() {
        defer w.wg.Done()
        for {
            _,ok := <-w.successChan
            if !ok {
                return
            }
            w.succmutex.Lock()
            w.successCount += 1
            w.succmutex.Unlock()
        }
    }
    
    // spawn individual thread to process the bytes
    func (w *Worker) Spawn() {
        defer w.wg.Done()
        defer w.clean()
        var wg sync.WaitGroup
        for i := 0; i < w.workers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                Thread{w: w}.Run()
            }()
        }
    
        wg.Wait()
    
    }
    
    // close the other open chan
    func (w *Worker) clean() {
        close(w.errorChan)
        close(w.successChan)
        close(w.progress)
    }
    
    // close the input chan
    func (w *Worker) Done() {
        close(w.input)
    }
    
    // sum the total byte we have processed
    func (w *Worker) trackProgress() {
        defer w.wg.Done()
        for {
            read,ok := <-w.progress
            if !ok {
                w.done = true
                return
            }
            w.progmutex.Lock()
            w.completedByte += read
            w.progmutex.Unlock()
    
        }
    }
    
    // display the progress bar
    func (w *Worker) display() {
        defer w.wg.Done()
        for !w.done {
            w.progmutex.Lock()
            percentage := (float64(w.completedByte) / float64(w.fileSize)) * 100
            w.progmutex.Unlock()
            fmt.Printf("\r Completed %f",percentage)
            time.Sleep(5 * time.Second)
        }
    }
    
    func readFile(path string) map[string]int {
        handler,err := os.Open(path)
        if err != nil {
            log.Fatal(err)
        }
        defer handler.Close()
        worker := &Worker{workers: 2}
        worker.input = make(chan []byte,2)
        worker.progress = make(chan int64,1)
        worker.errorChan = make(chan bool,1)
        worker.successChan = make(chan bool,1)
    
        if fi,err := handler.Stat(); err != nil {
            log.Fatal(err)
        } else {
            worker.fileSize = fi.Size()
        }
    
        scanner := bufio.NewScanner(handler)
        go worker.Start()
        for scanner.Scan() {
            worker.input <- []byte(scanner.Text())
        }
    
        worker.Done()
        if err := scanner.Err(); err != nil {
            log.Fatal(err)
            return nil
        }
    
        return map[string]int{
            "error":   worker.errorCount,"success": worker.successCount,}
    }
    
    func main() {
        readFile("dump.json")
    }

和测试代码


    package main // main_test.go
    
    import (
        "testing"
    )
    
    func TestReadJson(t *testing.T) {
        data := readFile("dump2.json")
        if data == nil {
            t.Error("we got a nil data")
        }
    }

这里是示例dump2.json数据

{"name": "tutorialspoint10"}
{"name":"tutorialspoint2","age": 15}
{"name":"tutorialspoint3","age": 25}
{"name":"tutorialspoint4","age": 28}
{"name":"tutorialspoint5","age": 40}
{"name": "tutorialspoint6"}
{"name":"tutorialspoint8","age": 7}
{"name":"tutorialspoint4","age": 55}
{"name":"tutorialspoint1","age":4}
{"name":"tutorialspoint2"}

最后,我知道这里发布的代码必须是最低限度的,但是我尽力保持了最低限度的代码(这是从原始项目中提取的)。我不确定如何(或到目前为止)将其最小化。

解决方法

您需要在main.go:197行读锁

"success": worker.successCount,

如日志所述。您尝试阅读,而另一次例行尝试尝试写作。 /Users/admin/Documents/goProject/src/mongo_import/race-d/main.go:197

简短说明:

https://dev.to/wagslane/golang-mutexes-what-is-rwmutex-for-57a0

在这种情况下,使用Atomic可能更好。 https://gobyexample.com/atomic-counters

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...