我需要异步、定期(每 10 秒)抓取/调用特定 URL(例如 http://dummy.com/{address} )以获取地址列表。
根据从 URL 接收到的结果,需要发布事件。
爬虫需要在一个 goroutine 中启动,并且每个 API 调用都需要在一个单独的 goroutine 中。
将启动另一个监听事件的 goroutine。
爬虫使用地址列表进行初始化,但它需要有公开的方法来添加将要被抓取的新地址或删除现有地址,在任何时间点。
请参阅下文,我的解决方案存在种族问题。
发生这种情况是因为爬虫结构的 observables 字段不是用于并发访问的“线程保存”。
我知道“不交流共享内存”规则,但没有弄清楚我将如何使用它 channel (谈论 observables 字段)而不是 slice ,以及如何为'添加/删除额外地址观看'如果使用 channel 。
如何修改波纹管解决方案以修复竞争条件?
package crawler
import (
"fmt"
log "github.com/sirupsen/logrus"
"io/ioutil"
"net/http"
"strconv"
"time"
)
type Service interface {
Start()
Stop()
AddObservable(observable Observable)
RemoveObservable(observable Observable)
GetEventChannel() chan event
}
type event struct {
EventType int
Result Result
}
type Result struct {
resp []byte
}
type Observable struct {
AccountType int
Address string
}
type crawler struct {
explorerApiUrl string
interval *time.Ticker
errChan chan error
quitChan chan int
eventChan chan event
observables []Observable
errorHandler func(err error)
}
func NewService(
observables []Observable,
errorHandler func(err error),
) Service {
interval := time.NewTicker(10 * time.Second)
return &crawler{
explorerApiUrl: "http://dummy.com",
interval: interval,
errChan: make(chan error),
quitChan: make(chan int),
eventChan: make(chan event),
observables: observables,
errorHandler: errorHandler,
}
}
func (u *crawler) Start() {
log.Debug("start observe")
for {
select {
case <-u.interval.C:
log.Debug("observe interval")
u.observeAll(u.observables)
case err := <-u.errChan:
u.errorHandler(err)
case <-u.quitChan:
log.Debug("stop observe")
u.interval.Stop()
time.Sleep(time.Second)
close(u.eventChan)
return
}
}
}
func (u *crawler) Stop() {
u.quitChan <- 1
}
func (u *crawler) AddObservable(observable Observable) {
u.observables = append(u.observables, observable)
}
func (u *crawler) RemoveObservable(observable Observable) {
newObservableList := make([]Observable, 0)
for _, o := range u.observables {
if o.Address != observable.Address {
newObservableList = append(newObservableList, o)
}
}
u.observables = newObservableList
}
func (u *crawler) GetEventChannel() chan event {
return u.eventChan
}
func (u *crawler) observeAll(observables []Observable) {
for _, a := range observables {
go u.observe(a)
}
}
func (u *crawler) observe(observe Observable) {
resp, err := http.Get(
fmt.Sprintf("%v/%v", u.explorerApiUrl, observe.Address),
)
if err != nil {
log.Error(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Error(err)
}
e := event{
EventType: 0,
Result: Result{
resp: body,
},
}
u.eventChan <- e
}
//// TEST ////
func TestCrawler(t *testing.T) {
observables := make([]Observable, 0)
for i := 0; i < 100; i++ {
observable := Observable{
AccountType: 1,
Address: strconv.Itoa(i),
}
observables = append(observables, observable)
}
crawlSvc := NewService(observables, nil)
go crawlSvc.Start()
go removeObservableAfterTimeout(crawlSvc)
go addObservableAfterTimeout(crawlSvc)
go stopCrawlerAfterTimeout(crawlSvc)
for event := range crawlSvc.GetEventChannel() {
t.Log(event)
}
}
func stopCrawlerAfterTimeout(crawler Service) {
time.Sleep(7 * time.Second)
crawler.Stop()
}
func removeObservableAfterTimeout(crawler Service) {
time.Sleep(2 * time.Second)
crawler.RemoveObservable(Observable{
AccountType: 0,
Address: "2",
})
}
func addObservableAfterTimeout(crawler Service) {
time.Sleep(5 * time.Second)
crawler.AddObservable(Observable{
AccountType: 0,
Address: "101",
})
}
最佳答案
这里最简单的做法是引入 RWMutex
,无需过多修改解决方案。到爬虫结构。这将有助于在处理 slice 时锁定代码的关键部分。请参阅以下更改:
type crawler struct {
explorerAPIURL string
interval *time.Ticker
errChan chan error
quitChan chan int
eventChan chan event
observables []Observable
errorHandler func(err error)
mtx sync.RWMutex
}
...
func (u *crawler) AddObservable(observable Observable) {
u.mtx.Lock()
u.observables = append(u.observables, observable)
u.mtx.Unlock()
}
func (u *crawler) RemoveObservable(observable Observable) {
u.mtx.Lock()
newObservableList := make([]Observable, 0)
for _, o := range u.observables {
if o.Address != observable.Address {
newObservableList = append(newObservableList, o)
}
}
u.observables = newObservableList
u.mtx.Unlock()
}
但是,虽然这解决了竞态问题,但我不保证您最终不会在某些时候遇到内存泄漏问题。例如,在尝试从 slice 中删除仍未完成执行的可观察对象时。我的建议是在所有执行完成之前暂停 slice 操作(添加或删除),或者在删除时引入检查以取消可观察对象的执行。
一种解决方案是为 slice 操作引入额外的 channel 并处理
Start
中的操作。功能。type crawler struct {
explorerAPIURL string
interval *time.Ticker
errChan chan error
quitChan chan int
addChan chan Observable
removeChan chan Observable
eventChan chan event
observables []Observable
errorHandler func(err error)
mtx sync.RWMutex
}
// NewService --
func NewService(
observables []Observable,
errorHandler func(err error),
) Service {
interval := time.NewTicker(10 * time.Second)
return &crawler{
explorerAPIURL: "http://dummy.com",
interval: interval,
errChan: make(chan error),
quitChan: make(chan int),
addChan: make(chan Observable),
removeChan: make(chan Observable),
eventChan: make(chan event),
observables: observables,
errorHandler: errorHandler,
}
}
func (u *crawler) Start() {
log.Debug("start observe")
for {
select {
case <-u.interval.C:
log.Debug("observe interval")
u.observeAll(u.observables)
case err := <-u.errChan:
u.errorHandler(err)
case <-u.quitChan:
log.Debug("stop observe")
u.interval.Stop()
time.Sleep(time.Second)
close(u.eventChan)
return
case o := <-u.addChan:
u.observables = append(u.observables, o)
case o := <-u.removeChan:
newObservableList := make([]Observable, 0)
for _, observable := range u.observables {
if o.Address != observable.Address {
newObservableList = append(newObservableList, observable)
}
}
u.observables = newObservableList
}
}
}
...
func (u *crawler) AddObservable(observable Observable) {
u.addChan <- observable
}
func (u *crawler) RemoveObservable(observable Observable) {
u.removeChan <- observable
}
...
//EDIT - I've added the modified versions of these functions as well.
func (u *crawler) observeAll(observables []Observable) {
g, _ := errgroup.WithContext(context.Background())
for _, a := range observables {
g.Go(func() error {
return u.observe(a)
})
}
if err := g.Wait(); err != nil {
log.Error(err)
}
}
func (u *crawler) observe(observe Observable) error {
resp, err := http.Get(
fmt.Sprintf("%v/%v", u.explorerAPIURL, observe.Address),
)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
e := event{
EventType: 0,
Result: Result{
resp: body,
},
}
u.eventChan <- e
return nil
}
关于go - 在 Golang 中定期爬取 API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63812394/