问题描述
我希望异步(每10秒)定期抓取/调用特定的URL(例如http://dummy.com/{address})以获取地址列表。
基于从URL接收到的结果,该事件需要发布。
搜寻器需要在一个goroutine中启动,每个API调用都必须在单独的goroutine中。
将启动另一个goroutine,它将监听事件。
该搜寻器使用地址列表进行了初始化,但它需要使用公开的方法来随时添加要被抓取的新地址或删除现有地址。
请看下面的问题,我的解决方案有种族问题。
发生这种情况是因为搜寻器结构的observables字段不是用于并行访问的“线程保存”。
我知道“不交流共享内存”规则,但是没有弄清楚我将如何使用它(讨论可观察字段)而不是切片,以及如何添加/删除如果使用频道,则“观看”的其他地址。
如何修改波纹管解决方案以解决比赛条件?
package crawler
import (
"fmt"
log "github.com/sirupsen/logrus"
"io/ioutil"
"net/http"
"strconv"
"time"
)
type Service interface {
Start()
Stop()
AddObservable(observable Observable)
RemoveObservable(observable Observable)
GetEventChannel() chan event
}
type event struct {
EventType int
Result Result
}
type Result struct {
resp []byte
}
type Observable struct {
AccountType int
Address string
}
type crawler struct {
explorerApiUrl string
interval *time.Ticker
errChan chan error
quitChan chan int
eventChan chan event
observables []Observable
errorHandler func(err error)
}
func NewService(
observables []Observable,errorHandler func(err error),) Service {
interval := time.NewTicker(10 * time.Second)
return &crawler{
explorerApiUrl: "http://dummy.com",interval: interval,errChan: make(chan error),quitChan: make(chan int),eventChan: make(chan event),observables: observables,errorHandler: errorHandler,}
}
func (u *crawler) Start() {
log.Debug("start observe")
for {
select {
case <-u.interval.C:
log.Debug("observe interval")
u.observeAll(u.observables)
case err := <-u.errChan:
u.errorHandler(err)
case <-u.quitChan:
log.Debug("stop observe")
u.interval.Stop()
time.Sleep(time.Second)
close(u.eventChan)
return
}
}
}
func (u *crawler) Stop() {
u.quitChan <- 1
}
func (u *crawler) AddObservable(observable Observable) {
u.observables = append(u.observables,observable)
}
func (u *crawler) RemoveObservable(observable Observable) {
newObservableList := make([]Observable,0)
for _,o := range u.observables {
if o.Address != observable.Address {
newObservableList = append(newObservableList,o)
}
}
u.observables = newObservableList
}
func (u *crawler) GetEventChannel() chan event {
return u.eventChan
}
func (u *crawler) observeAll(observables []Observable) {
for _,a := range observables {
go u.observe(a)
}
}
func (u *crawler) observe(observe Observable) {
resp,err := http.Get(
fmt.Sprintf("%v/%v",u.explorerApiUrl,observe.Address),)
if err != nil {
log.Error(err)
}
defer resp.Body.Close()
body,err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Error(err)
}
e := event{
EventType: 0,Result: Result{
resp: body,},}
u.eventChan <- e
}
//// TEST ////
func TestCrawler(t *testing.T) {
observables := make([]Observable,0)
for i := 0; i < 100; i++ {
observable := Observable{
AccountType: 1,Address: strconv.Itoa(i),}
observables = append(observables,observable)
}
crawlSvc := NewService(observables,nil)
go crawlSvc.Start()
go removeObservableAfterTimeout(crawlSvc)
go addObservableAfterTimeout(crawlSvc)
go stopCrawlerAfterTimeout(crawlSvc)
for event := range crawlSvc.GetEventChannel() {
t.Log(event)
}
}
func stopCrawlerAfterTimeout(crawler Service) {
time.Sleep(7 * time.Second)
crawler.Stop()
}
func removeObservableAfterTimeout(crawler Service) {
time.Sleep(2 * time.Second)
crawler.RemoveObservable(Observable{
AccountType: 0,Address: "2",})
}
func addObservableAfterTimeout(crawler Service) {
time.Sleep(5 * time.Second)
crawler.AddObservable(Observable{
AccountType: 0,Address: "101",})
}
解决方法
在此做的最简单的事情是不对解决方案进行太多修改,而是将RWMutex
引入搜寻器结构。这将有助于在处理切片时锁定代码的关键部分。请参阅以下更改:
type crawler struct {
explorerAPIURL string
interval *time.Ticker
errChan chan error
quitChan chan int
eventChan chan event
observables []Observable
errorHandler func(err error)
mtx sync.RWMutex
}
...
func (u *crawler) AddObservable(observable Observable) {
u.mtx.Lock()
u.observables = append(u.observables,observable)
u.mtx.Unlock()
}
func (u *crawler) RemoveObservable(observable Observable) {
u.mtx.Lock()
newObservableList := make([]Observable,0)
for _,o := range u.observables {
if o.Address != observable.Address {
newObservableList = append(newObservableList,o)
}
}
u.observables = newObservableList
u.mtx.Unlock()
}
但是,尽管这可以解决竞争问题,但我不保证您最终不会在某个时候遇到内存泄漏问题。例如,在尝试删除仍然无法从切片中执行的可观察对象时。
我的建议是要么推迟切片操作(添加或删除),直到全部执行完成,要么引入检查以取消可观察对象的执行。
一种解决方案是为切片操作引入额外的通道并处理Start
函数中的操作。
type crawler struct {
explorerAPIURL string
interval *time.Ticker
errChan chan error
quitChan chan int
addChan chan Observable
removeChan chan Observable
eventChan chan event
observables []Observable
errorHandler func(err error)
mtx sync.RWMutex
}
// NewService --
func NewService(
observables []Observable,errorHandler func(err error),) Service {
interval := time.NewTicker(10 * time.Second)
return &crawler{
explorerAPIURL: "http://dummy.com",interval: interval,errChan: make(chan error),quitChan: make(chan int),addChan: make(chan Observable),removeChan: make(chan Observable),eventChan: make(chan event),observables: observables,errorHandler: errorHandler,}
}
func (u *crawler) Start() {
log.Debug("start observe")
for {
select {
case <-u.interval.C:
log.Debug("observe interval")
u.observeAll(u.observables)
case err := <-u.errChan:
u.errorHandler(err)
case <-u.quitChan:
log.Debug("stop observe")
u.interval.Stop()
time.Sleep(time.Second)
close(u.eventChan)
return
case o := <-u.addChan:
u.observables = append(u.observables,o)
case o := <-u.removeChan:
newObservableList := make([]Observable,0)
for _,observable := range u.observables {
if o.Address != observable.Address {
newObservableList = append(newObservableList,observable)
}
}
u.observables = newObservableList
}
}
}
...
func (u *crawler) AddObservable(observable Observable) {
u.addChan <- observable
}
func (u *crawler) RemoveObservable(observable Observable) {
u.removeChan <- observable
}
...
//EDIT - I've added the modified versions of these functions as well.
func (u *crawler) observeAll(observables []Observable) {
g,_ := errgroup.WithContext(context.Background())
for _,a := range observables {
g.Go(func() error {
return u.observe(a)
})
}
if err := g.Wait(); err != nil {
log.Error(err)
}
}
func (u *crawler) observe(observe Observable) error {
resp,err := http.Get(
fmt.Sprintf("%v/%v",u.explorerAPIURL,observe.Address),)
if err != nil {
return err
}
defer resp.Body.Close()
body,err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
e := event{
EventType: 0,Result: Result{
resp: body,},}
u.eventChan <- e
return nil
}