go - 在 Golang 中定期爬取 API

标签 go concurrency goroutine

我需要异步、定期(每 10 秒)抓取/调用特定 URL(例如 http://dummy.com/{address} )以获取地址列表。
根据从 URL 接收到的结果,需要发布事件。
爬虫需要在一个 goroutine 中启动,并且每个 API 调用都需要在一个单独的 goroutine 中。
将启动另一个监听事件的 goroutine。
爬虫使用地址列表进行初始化,但它需要有公开的方法来添加将要被抓取的新地址或删除现有地址,在任何时间点。
请参阅下文,我的解决方案存在种族问题。
发生这种情况是因为爬虫结构的 observables 字段不是用于并发访问的“线程保存”。
我知道“不交流共享内存”规则,但没有弄清楚我将如何使用它 channel (谈论 observables 字段)而不是 slice ,以及如何为'添加/删除额外地址观看'如果使用 channel 。
如何修改波纹管解决方案以修复竞争条件?

package crawler

import (
    "fmt"
    log "github.com/sirupsen/logrus"
    "io/ioutil"
    "net/http"
    "strconv"
    "time"
)

type Service interface {
    Start()
    Stop()
    AddObservable(observable Observable)
    RemoveObservable(observable Observable)
    GetEventChannel() chan event
}

type event struct {
    EventType int
    Result    Result
}

type Result struct {
    resp []byte
}

type Observable struct {
    AccountType int
    Address     string
}

type crawler struct {
    explorerApiUrl string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
}

func NewService(
    observables []Observable,
    errorHandler func(err error),
) Service {

    interval := time.NewTicker(10 * time.Second)

    return &crawler{
        explorerApiUrl: "http://dummy.com",
        interval:       interval,
        errChan:        make(chan error),
        quitChan:       make(chan int),
        eventChan:      make(chan event),
        observables:    observables,
        errorHandler:   errorHandler,
    }
}

func (u *crawler) Start() {
    log.Debug("start observe")
    for {
        select {
        case <-u.interval.C:
            log.Debug("observe interval")
            u.observeAll(u.observables)
        case err := <-u.errChan:
            u.errorHandler(err)
        case <-u.quitChan:
            log.Debug("stop observe")
            u.interval.Stop()
            time.Sleep(time.Second)
            close(u.eventChan)
            return
        }
    }
}

func (u *crawler) Stop() {
    u.quitChan <- 1
}

func (u *crawler) AddObservable(observable Observable) {
    u.observables = append(u.observables, observable)
}

func (u *crawler) RemoveObservable(observable Observable) {
    newObservableList := make([]Observable, 0)
    for _, o := range u.observables {
        if o.Address != observable.Address {
            newObservableList = append(newObservableList, o)
        }
    }
    u.observables = newObservableList
}

func (u *crawler) GetEventChannel() chan event {
    return u.eventChan
}

func (u *crawler) observeAll(observables []Observable) {
    for _, a := range observables {
        go u.observe(a)
    }
}

func (u *crawler) observe(observe Observable) {

    resp, err := http.Get(
        fmt.Sprintf("%v/%v", u.explorerApiUrl, observe.Address),
    )
    if err != nil {
        log.Error(err)
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Error(err)
    }
    e := event{
        EventType: 0,
        Result: Result{
            resp: body,
        },
    }

    u.eventChan <- e
}

//// TEST ////

func TestCrawler(t *testing.T) {
    observables := make([]Observable, 0)
    for i := 0; i < 100; i++ {
        observable := Observable{
            AccountType: 1,
            Address:     strconv.Itoa(i),
        }
        observables = append(observables, observable)
    }

    crawlSvc := NewService(observables, nil)

    go crawlSvc.Start()

    go removeObservableAfterTimeout(crawlSvc)

    go addObservableAfterTimeout(crawlSvc)

    go stopCrawlerAfterTimeout(crawlSvc)

    for event := range crawlSvc.GetEventChannel() {
        t.Log(event)
    }
}

func stopCrawlerAfterTimeout(crawler Service) {
    time.Sleep(7 * time.Second)
    crawler.Stop()
}

func removeObservableAfterTimeout(crawler Service) {
    time.Sleep(2 * time.Second)
    crawler.RemoveObservable(Observable{
        AccountType: 0,
        Address:     "2",
    })
}

func addObservableAfterTimeout(crawler Service) {
    time.Sleep(5 * time.Second)
    crawler.AddObservable(Observable{
        AccountType: 0,
        Address:     "101",
    })
}

最佳答案

这里最简单的做法是引入 RWMutex,无需过多修改解决方案。到爬虫结构。这将有助于在处理 slice 时锁定代码的关键部分。请参阅以下更改:

type crawler struct {
    explorerAPIURL string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
    mtx            sync.RWMutex
}

...

func (u *crawler) AddObservable(observable Observable) {
    u.mtx.Lock()
    u.observables = append(u.observables, observable)
    u.mtx.Unlock()
}

func (u *crawler) RemoveObservable(observable Observable) {
    u.mtx.Lock()
    newObservableList := make([]Observable, 0)
    for _, o := range u.observables {
        if o.Address != observable.Address {
            newObservableList = append(newObservableList, o)
        }
    }
    u.observables = newObservableList
    u.mtx.Unlock()
}
但是,虽然这解决了竞态问题,但我不保证您最终不会在某些时候遇到内存泄漏问题。例如,在尝试从 slice 中删除仍未完成执行的可观察对象时。
我的建议是在所有执行完成之前暂停 slice 操作(添加或删除),或者在删除时引入检查以取消可观察对象的执行。
一种解决方案是为 slice 操作引入额外的 channel 并处理 Start 中的操作。功能。
type crawler struct {
    explorerAPIURL string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    addChan        chan Observable
    removeChan     chan Observable
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
    mtx            sync.RWMutex
}

// NewService --
func NewService(
    observables []Observable,
    errorHandler func(err error),
) Service {

    interval := time.NewTicker(10 * time.Second)

    return &crawler{
        explorerAPIURL: "http://dummy.com",
        interval:       interval,
        errChan:        make(chan error),
        quitChan:       make(chan int),
        addChan:        make(chan Observable),
        removeChan:     make(chan Observable),
        eventChan:      make(chan event),
        observables:    observables,
        errorHandler:   errorHandler,
    }
}

func (u *crawler) Start() {
    log.Debug("start observe")
    for {
        select {
        case <-u.interval.C:
            log.Debug("observe interval")
            u.observeAll(u.observables)
        case err := <-u.errChan:
            u.errorHandler(err)
        case <-u.quitChan:
            log.Debug("stop observe")
            u.interval.Stop()
            time.Sleep(time.Second)
            close(u.eventChan)
            return
        case o := <-u.addChan:
            u.observables = append(u.observables, o)
        case o := <-u.removeChan:
            newObservableList := make([]Observable, 0)
            for _, observable := range u.observables {
                if o.Address != observable.Address {
                    newObservableList = append(newObservableList, observable)
                }
            }
            u.observables = newObservableList
        }
    }
}

...

func (u *crawler) AddObservable(observable Observable) {
    u.addChan <- observable
}

func (u *crawler) RemoveObservable(observable Observable) {
    u.removeChan <- observable
}

...

//EDIT - I've added the modified versions of these functions as well.
func (u *crawler) observeAll(observables []Observable) {
    g, _ := errgroup.WithContext(context.Background())
    for _, a := range observables {
        g.Go(func() error {
            return u.observe(a)
        })
    }

    if err := g.Wait(); err != nil {
        log.Error(err)
    }
}

func (u *crawler) observe(observe Observable) error {

    resp, err := http.Get(
        fmt.Sprintf("%v/%v", u.explorerAPIURL, observe.Address),
    )
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return err
    }
    e := event{
        EventType: 0,
        Result: Result{
            resp: body,
        },
    }

    u.eventChan <- e
    return nil
}

关于go - 在 Golang 中定期爬取 API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63812394/

相关文章:

unit-testing - 模拟 os.GetEnv ("ENV_VAR")

mysql - 在 Go 中使用查询参数获取记录

go - 如何安全地回收 golang 中的 protobuf 对象

java - 多线程 Socket 通讯 Client/Server

go - `make(chan _, _)` 是原子的吗?

go - 需要将 2 维数组转换为字符串并将最后一个逗号替换为句号。(Golang)

go - 如何在 golang 中处理 goroutines 并获得响应

go - 如何在golang中将 slice 附加到字节数组

java - 不修改列表但仍然得到一个 ConcurrentModificationException

java - 在对象上同步并更改引用