Shawson Lim

搬运工. 砌匠. 包工头.

Category
 » mysql
 » golang
 » linux
 » git
 » php
 » front
 » staticstics
 » default
 » tools

[Golang] goroutine和channel实践

29 Oct 2014 » golang

场景:

将数据库中每个表中的game_id和user_id导出成日志文件, 由于共有1K+张表,顺序导出花费时间会很多,一次性并发导出又可能对mysql的io占用比较高进而影响线上业务。 所以暂定以5张表并发进行导出的方式。语言方面使用Golang实现,主要涉及要点:

  • 如何导出数据(需要导出的数据都在索引上面,可以直接使用索引快速查询)?
  • 如何有效控制并发量(使用channel,可以利用阻塞特性进行控制)?
/**
 * TODO
 *
 * [os/exec]
 * 	https://github.com/viney/command/blob/master/main.go
 *  http://www.widuu.com/archives/01/927.html
 */
package main

import (
	"bufio"
	"fmt"
	"io"
	"io/ioutil"
	"os"
	"os/exec"
	"strconv"
	"time"
)

var total int
var quit chan int
var chInt chan int

/**
 *
 */
func main() {
	list := []map[string]string{
		{
			"min":  "1",
			"max":  "80",
			"flag": "hot",

			"host":     "xxx",
			"database": "xxx",
			"user":     "xxx",
			"pwd":      "xxx",
		},
	}

	///
	total = 80
	quit = make(chan int)
	chInt = make(chan int, 5)
	fmt.Println("Len:", len(chInt))
	fmt.Println("Cap:", cap(chInt))
	for _, conf := range list {
		doDatabase(conf)
	}

	///
	for {
		select {
		case <-quit:
			return
		}
	}
}

/**
 *
 */
func doDatabase(conf map[string]string) {
	min, _ := strconv.Atoi(conf["min"])
	max, _ := strconv.Atoi(conf["max"])
	for i := min; i <= max; i++ {
		chInt <- 1
		table := fmt.Sprintf("user_%03d", i)
		argv := []string{
			fmt.Sprintf("-u%s", conf["user"]),
			fmt.Sprintf("-p%s", conf["pwd"]),
			fmt.Sprintf("-h%s", conf["host"]),
			fmt.Sprintf("-e SELECT DISTINCT game_id,user_id FROM %s.%s;", conf["database"], table),
		}
		cmd := exec.Command("mysql", argv...)
		go runCmd(cmd, table, conf["flag"])
	}
}

/**
 *
 */
func runCmd(cmd *exec.Cmd, table string, flag string) {
	///
	timeStart := now()

	/// 等待结束
	defer func() {
		timeEnd := now()
		fmt.Printf("[%s_%s] %s - %s\n", flag, table, timeStart, timeEnd)
		if err := cmd.Wait(); err != nil {
			fmt.Println("Wait: ", err.Error())
			return
		}

		<-chInt
		total--
		if total == 0 {
			quit <- 1
		}
	}()

	/// 监听管道
	stderr, err := cmd.StderrPipe()
	if err != nil {
		fmt.Println("StderrPipe: ", err.Error())
		return
	}
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		fmt.Println("StdoutPipe: " + err.Error())
		return
	}

	/// 启动并处理管道内容
	if err := cmd.Start(); err != nil {
		fmt.Println("Start: ", err.Error())
		return
	}
	if !handlerStderr(stderr) {
		return
	}
	handlerStdout(stdout, table, flag)
}

func handlerStderr(stderr io.ReadCloser) bool {
	///
	bytesErr, err := ioutil.ReadAll(stderr)
	if err != nil {
		fmt.Println("ReadAll stderr: ", err.Error())
		return false
	}
	if len(bytesErr) != 0 {
		fmt.Printf("stderr is not nil: %s", bytesErr)
		return false
	}

	return true
}

func handlerStdout(stdout io.ReadCloser, table string, flag string) {
	///
	file := fmt.Sprintf("./table/%s_%s.log", flag, table)
	fp, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0666)
	defer fp.Close()
	if err != nil {
		fmt.Printf(err.Error())
		return
	}

	///
	i := 0
	rd := bufio.NewReader(stdout)
	for {
		i++
		line, ferr := rd.ReadString('\n')
		if ferr == io.EOF {
			return
		}
		if i == 1 {
			continue
		}
		fp.WriteString(line)
	}
}

func now() string {
	return time.Now().Format("2006/01/02 15:04:05")
}
© Shawson Lim - https://github.com/linsir123 - Powered by Jekyll.