go - 这是进行并行编程的更好方法吗?

标签 go parallel-processing channel

我制作这个脚本是为了从 Instagram 获取“影响者”的关注者数量

我从中得到的“运行时”数字在 550-750 毫秒之间。 它并没有那么糟糕,但我想知道它是否可以更好(因为我是一个 golang 菜鸟 - 只学了 3 周)

package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "sync"
    "time"
)

type user struct {
    User userData `json:"user"`
}

type userData struct {
    Followers count `json:"followed_by"`
}

type count struct {
    Count int `json:"count"`
}

func getFollowerCount(in <-chan string) <-chan int {
    out := make(chan int)
    go func() {
        for un := range in {
            URL := "https://www.instagram.com/" + un + "/?__a=1"
            resp, err := http.Get(URL)
            if err != nil {
                // handle error
                fmt.Println(err)
            }
            defer resp.Body.Close()
            body, err := ioutil.ReadAll(resp.Body)
            var u user
            err = json.Unmarshal(body, &u)
            if err != nil {
                fmt.Println(err)
            }
            // return u.User.Followers.Count
            out <- u.User.Followers.Count
        }
        close(out)
    }()
    return out
}

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }

    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

func gen(users ...string) <-chan string {
    out := make(chan string)
    go func() {
        for _, u := range users {
            out <- u
        }
        close(out)
    }()
    return out
}

func main() {
    start := time.Now()
    fmt.Println("STARTING UP")
    usrs := []string{"kanywest", "kimkardashian", "groovyq", "kendricklamar", "barackobama", "asaprocky", "champagnepapi", "eminem", "drdre", "g_eazy", "skrillex"}
    in := gen(usrs...)
    d1 := getFollowerCount(in)
    d2 := getFollowerCount(in)
    d3 := getFollowerCount(in)
    d4 := getFollowerCount(in)
    d5 := getFollowerCount(in)
    d6 := getFollowerCount(in)
    d7 := getFollowerCount(in)
    d8 := getFollowerCount(in)
    d9 := getFollowerCount(in)
    d10 := getFollowerCount(in)

    for d := range merge(d1, d2, d3, d4, d5, d6, d7, d8, d9, d10) {
        fmt.Println(d)
    }

    elapsed := time.Since(start)
    log.Println("runtime", elapsed)
}

最佳答案

我同意 jeevatkm,有很多方法可以实现你的任务并改进它。一些注意事项:

  1. 将实际执行工作的功能(即从远程服务获取结果)与负责协调所有工作的功能分开。
  2. 错误传播给调用者而不是在要调用的函数中使用(处理)错误是一种很好的做法。
  3. 由于作业是并行 完成的,因此结果可能会以不确定的顺序返回。因此,除了关注者数量外,结果还应包含其他相关信息。

以下实现可能是一种选择:

package main

import (
    "encoding/json"
    "errors"
    "fmt"
    "net/http"
    "sync"
    "time"
)

type user struct {
    User userData `json:"user"`
}

type userData struct {
    Followers count `json:"followed_by"`
}

type count struct {
    Count int `json:"count"`
}

//Wrap username, count, and error. See (3) above.
type follower struct {
    Username string
    Count    int
    Error    error
}

//GetFollowerCountFunc is a function for
//fetching follower count of a specific user.
type GetFollowerCountFunc func(string) (int, error)

//Mockup function for test
func mockGetFollowerCountFor(userName string) (int, error) {
    if len(userName) < 9 {
        return -1, errors.New("mocking error in get follower count")
    }
    return 10, nil
}

//Fetch result from remote service. See (1) above.
func getFollowerCountFor(userName string) (int, error) {
    URL := "https://www.instagram.com/" + userName + "/?__a=1"
    resp, err := http.Get(URL)
    if err != nil {
        return -1, err
    }
    defer resp.Body.Close()

    var u user
    if err := json.NewDecoder(resp.Body).Decode(&u); err != nil {
        return -1, err
    }
    return u.User.Followers.Count, nil
}

//Function that coordinates/distributes the jobs. See (1), (2) above.
func getFollowersAsync(users []string, fn GetFollowerCountFunc) <-chan follower {
    //allocate channels for storing result
    //number of allocated channels define the maximum *parallel* worker
    followers := make(chan follower, len(users))
    //The following is also valid
    //followers := make(chan follower, 5)

    //Do the job distribution in goroutine (Asynchronously)
    go func() {
        var wg sync.WaitGroup
        wg.Add(len(users))
        for _, u := range users {
            //Run a *parallel* worker
            go func(uid string) {
                cnt, err := fn(uid)
                if err != nil {
                    followers <- follower{uid, -1, err}
                } else {
                    followers <- follower{uid, cnt, nil}
                }
                wg.Done()
            }(u)
        }
        //wait all workers finish
        wg.Wait()

        //close the channels so the `for ... range` will exit gracefully
        close(followers)
    }()

    //This function will returns immediately
    return followers
}

func main() {
    start := time.Now()
    fmt.Println("STARTING UP")
    usrs := []string{"kanywest", "kimkardashian", "groovyq", "kendricklamar", "barackobama", "asaprocky", "champagnepapi", "eminem", "drdre", "g_eazy", "skrillex"}

    results := getFollowersAsync(usrs, getFollowerCountFor)
    //For TESTING:
    //results := getFollowersAsync(usrs, mockGetFollowerCountFor)
    for r := range results {
        if r.Error != nil {
            fmt.Printf("Error for user '%s' => %v", r.Username, r.Error)
        } else {
            fmt.Printf("%s: %d\n", r.Username, r.Count)
        }
    }

    elapsed := time.Since(start)
    fmt.Println("runtime", elapsed)
}

关于go - 这是进行并行编程的更好方法吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45204814/

相关文章:

go - Go 中不一致的追加行为?

c++ - 用于 visual studio express 2010 的 PThreads

java - 如何仅在处理完 RDD 中的所有分区后才在 Spark Streaming 中接收输入?

google-app-engine - 如何从 appengine.Context 创建云 context.Context

gocql 无法将 blob 解码为 *[20]uint8

c# - 如何在 Windows Phone 上运行并行任务?

去 channel 无限循环

node.js - 使用 Socket.io 向多个房间发送消息?

java - Netty - 无法在注册时写入 channel (channelRegistered 事件)

go - 如何实现 goroutines 的管道?