定期在Golang中抓取API

问题描述

我希望异步(每10秒)定期抓取/调用特定的URL(例如http://dummy.com/{address})以获取地址列表。
基于从URL接收到的结果,该事件需要发布。
搜寻器需要在一个goroutine中启动,每个API调用都必须在单独的goroutine中。
将启动另一个goroutine,它将监听事件。
该搜寻器使用地址列表进行了初始化,但它需要使用公开的方法来随时添加要被抓取的新地址或删除现有地址。

请看下面的问题,我的解决方案有种族问题。
发生这种情况是因为搜寻器结构的observables字段不是用于并行访问的“线程保存”。
我知道“不交流共享内存”规则,但是没有弄清楚我将如何使用它(讨论可观察字段)而不是切片,以及如何添加/删除如果使用频道,则“观看”的其他地址。

如何修改波纹管解决方案以解决比赛条件?

package crawler

import (
    "fmt"
    log "github.com/sirupsen/logrus"
    "io/ioutil"
    "net/http"
    "strconv"
    "time"
)

type Service interface {
    Start()
    Stop()
    AddObservable(observable Observable)
    RemoveObservable(observable Observable)
    GetEventChannel() chan event
}

type event struct {
    EventType int
    Result    Result
}

type Result struct {
    resp []byte
}

type Observable struct {
    AccountType int
    Address     string
}

type crawler struct {
    explorerApiUrl string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
}

func NewService(
    observables []Observable,errorHandler func(err error),) Service {

    interval := time.NewTicker(10 * time.Second)

    return &crawler{
        explorerApiUrl: "http://dummy.com",interval:       interval,errChan:        make(chan error),quitChan:       make(chan int),eventChan:      make(chan event),observables:    observables,errorHandler:   errorHandler,}
}

func (u *crawler) Start() {
    log.Debug("start observe")
    for {
        select {
        case <-u.interval.C:
            log.Debug("observe interval")
            u.observeAll(u.observables)
        case err := <-u.errChan:
            u.errorHandler(err)
        case <-u.quitChan:
            log.Debug("stop observe")
            u.interval.Stop()
            time.Sleep(time.Second)
            close(u.eventChan)
            return
        }
    }
}

func (u *crawler) Stop() {
    u.quitChan <- 1
}

func (u *crawler) AddObservable(observable Observable) {
    u.observables = append(u.observables,observable)
}

func (u *crawler) RemoveObservable(observable Observable) {
    newObservableList := make([]Observable,0)
    for _,o := range u.observables {
        if o.Address != observable.Address {
            newObservableList = append(newObservableList,o)
        }
    }
    u.observables = newObservableList
}

func (u *crawler) GetEventChannel() chan event {
    return u.eventChan
}

func (u *crawler) observeAll(observables []Observable) {
    for _,a := range observables {
        go u.observe(a)
    }
}

func (u *crawler) observe(observe Observable) {

    resp,err := http.Get(
        fmt.Sprintf("%v/%v",u.explorerApiUrl,observe.Address),)
    if err != nil {
        log.Error(err)
    }
    defer resp.Body.Close()
    body,err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Error(err)
    }
    e := event{
        EventType: 0,Result: Result{
            resp: body,},}

    u.eventChan <- e
}

//// TEST ////

func TestCrawler(t *testing.T) {
    observables := make([]Observable,0)
    for i := 0; i < 100; i++ {
        observable := Observable{
            AccountType: 1,Address:     strconv.Itoa(i),}
        observables = append(observables,observable)
    }

    crawlSvc := NewService(observables,nil)

    go crawlSvc.Start()

    go removeObservableAfterTimeout(crawlSvc)

    go addObservableAfterTimeout(crawlSvc)

    go stopCrawlerAfterTimeout(crawlSvc)

    for event := range crawlSvc.GetEventChannel() {
        t.Log(event)
    }
}

func stopCrawlerAfterTimeout(crawler Service) {
    time.Sleep(7 * time.Second)
    crawler.Stop()
}

func removeObservableAfterTimeout(crawler Service) {
    time.Sleep(2 * time.Second)
    crawler.RemoveObservable(Observable{
        AccountType: 0,Address:     "2",})
}

func addObservableAfterTimeout(crawler Service) {
    time.Sleep(5 * time.Second)
    crawler.AddObservable(Observable{
        AccountType: 0,Address:     "101",})
}

解决方法

在此做的最简单的事情是不对解决方案进行太多修改,而是将RWMutex引入搜寻器结构。这将有助于在处理切片时锁定代码的关键部分。请参阅以下更改:

type crawler struct {
    explorerAPIURL string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
    mtx            sync.RWMutex
}

...

func (u *crawler) AddObservable(observable Observable) {
    u.mtx.Lock()
    u.observables = append(u.observables,observable)
    u.mtx.Unlock()
}

func (u *crawler) RemoveObservable(observable Observable) {
    u.mtx.Lock()
    newObservableList := make([]Observable,0)
    for _,o := range u.observables {
        if o.Address != observable.Address {
            newObservableList = append(newObservableList,o)
        }
    }
    u.observables = newObservableList
    u.mtx.Unlock()
}

但是,尽管这可以解决竞争问题,但我不保证您最终不会在某个时候遇到内存泄漏问题。例如,在尝试删除仍然无法从切片中执行的可观察对象时。

我的建议是要么推迟切片操作(添加或删除),直到全部执行完成,要么引入检查以取消可观察对象的执行。

一种解决方案是为切片操作引入额外的通道并处理Start函数中的操作。

type crawler struct {
    explorerAPIURL string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    addChan        chan Observable
    removeChan     chan Observable
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
    mtx            sync.RWMutex
}

// NewService --
func NewService(
    observables []Observable,errorHandler func(err error),) Service {

    interval := time.NewTicker(10 * time.Second)

    return &crawler{
        explorerAPIURL: "http://dummy.com",interval:       interval,errChan:        make(chan error),quitChan:       make(chan int),addChan:        make(chan Observable),removeChan:     make(chan Observable),eventChan:      make(chan event),observables:    observables,errorHandler:   errorHandler,}
}

func (u *crawler) Start() {
    log.Debug("start observe")
    for {
        select {
        case <-u.interval.C:
            log.Debug("observe interval")
            u.observeAll(u.observables)
        case err := <-u.errChan:
            u.errorHandler(err)
        case <-u.quitChan:
            log.Debug("stop observe")
            u.interval.Stop()
            time.Sleep(time.Second)
            close(u.eventChan)
            return
        case o := <-u.addChan:
            u.observables = append(u.observables,o)
        case o := <-u.removeChan:
            newObservableList := make([]Observable,0)
            for _,observable := range u.observables {
                if o.Address != observable.Address {
                    newObservableList = append(newObservableList,observable)
                }
            }
            u.observables = newObservableList
        }
    }
}

...

func (u *crawler) AddObservable(observable Observable) {
    u.addChan <- observable
}

func (u *crawler) RemoveObservable(observable Observable) {
    u.removeChan <- observable
}

...

//EDIT - I've added the modified versions of these functions as well.
func (u *crawler) observeAll(observables []Observable) {
    g,_ := errgroup.WithContext(context.Background())
    for _,a := range observables {
        g.Go(func() error {
            return u.observe(a)
        })
    }

    if err := g.Wait(); err != nil {
        log.Error(err)
    }
}

func (u *crawler) observe(observe Observable) error {

    resp,err := http.Get(
        fmt.Sprintf("%v/%v",u.explorerAPIURL,observe.Address),)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    body,err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return err
    }
    e := event{
        EventType: 0,Result: Result{
            resp: body,},}

    u.eventChan <- e
    return nil
}

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...