Golang实现并行命令

Gotty

在集群中经常我们会需要在多个节点上执行相同的操作,有不少的软件能帮助我们实现这样的需求,例如大名鼎鼎的Salt,或者更加轻量级的dsh。但是一方面如果在我们自己编写的程序里面再调用这些软件获取并行命令的输出结果,这个命令的执行其实是阻塞的,需要一直等待salt/dsh命令执行完毕后获取输出。另一方面引入salt这样的软件会给部署和维护带来更大的成本,而实际上我们用到的功能仅仅是salt的一小部分,实在是有些得不偿失。如果我们想不依赖其他软件,自己实现一个简单的并行命令的功能,难度会如何呢?这篇文章就为大家介绍一下我是如何利用Redis和共享存储实现简单的并行命令的。

核心功能

  • 支持异步批量执行命令
    • 命令执行
    • 权限控制
  • 能实时查询执行结果
    • 结果输出
    • 对命令输出做限制
  • 支持命令终止

大体思路

MySQL存储命令的基本信息并生成ID,利用Redis分发消息。在集群环境中通常都会有共享存储(这里我们假设所有节点的/home目录均是共享存储),我们只需将命令执行结果以文件形式重定向到共享存储下,文件以一定的编码规律存放,例如/home/pcm/$CommandID/$hostname.out/home/pcm/$CommandID/$hostname.err.这样查询命令输出结果时只需要遍历该命令ID路径下的所有输出结果即可。(大名鼎鼎的集群调度软件Slurm其实也是采用了类似的做法)为了实现命令终止以及获得可靠的命令执行结果判断,我们需要将pid记录回Redis,另外将所有的exit code记录到文件中以备查询。

支持异步批量执行命令

插入命令

这一步很简单,利用MySQL生成一个自增的ID,然后将命令丢给agent,agent会将消息经过Redis分发到节点上。具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Start starts run parallel command on nodes, it returns the command ID if
// start successfully.
func (p *PcmManager) Start(req *Req) (int, error) {
command := mdef.Pcm{
UID: req.UID,
StartTime: time.Now().Unix(),
Command: req.Command,
Nodes: strings.Join(req.Nodes, ","),
}
err = p.db.Insert(&command)
if err != nil {
return -1, err
}
err = p.agent.DoPcm(command.ID, command.UID, req.Command, req.Nodes)
if err != nil {
return -1, err
}
return command.ID, nil
}

下发命令

前端请求参数包含命令内容以及一个由节点名(hostname)构成的字符数组,收到请求后,将命令经过Redis下发。这里有两种不同的方式:

  1. 利用Redis的Pub/Sub机制,所有的节点都订阅一个消息,例如$hostname:pcm,主机往这个key上发布消息,所有的节点都会收到这个消息然后处理。
  2. 利用list做消息队列,内容放入Redis的消息队列中,例如key是$hostname:pcm,主机使用LPUSH命令将消息压入,所有节点均由一个goroutine利用BRPOP轮询。

两种方式没有优劣之分,只是根据需求来确定你需要的方式,方法1让所有在线的节点可以执行指令,但关机的节点不会执行。而方法二就算节点关机,一旦开机之后就可以执行之前的命令。这个根据自己需求选择即可。

agent部分负责分发消息、监控消息,由于监控消息的方式有不同,具体大家自己去按自己的思路实现即可,这里只贴出分发的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// DoPcm pushes command info into Redis, pcmExecutor will pop and execute it.
func (m *Manager) DoPcm(cid, uid int, command string, nodes []string) error {
pcm := Pcm{
CID: cid,
UID: uid,
Command: command,
Nodes: nodes,
}
data, err := json.Marshal(pcm)
if err != nil {
return errors.New(def.ErrGeneralJSONMarshal,
fmt.Sprintf("can't marshal JSON data:%v", err))
}
var fails []string
for _, node := range nodes {
key := fmt.Sprintf("agent:%s:pcm:todo", node)
_, err := store.Do("LPUSH", key, string(data))
if err != nil {
fails = append(fails, node)
}
}
if len(fails) > 0 {
return errors.New(def.ErrAgentBatchExec,
fmt.Sprintf("failed to run cmd:%s on following nodes:%s",
data, strings.Join(fails, ",")))
}
return nil
}

执行命令

节点拿到消息后调用exec.Command执行命令,但这里需要修改一下输出定向,记录pid以备终止命令时使用,将错误退出码记录到文件,以用户权限执行命令。具体范例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (m *Manager) pcmExec(pcm *Pcm) {
dir := fmt.Sprintf("/home/pcm/%d", pcm.CID)
hostName, err := os.Hostname()
if err != nil {
logs.Error("pcmExec can't get hostname: %v", err)
return
}
err = os.MkdirAll(dir, 0755)
if err != nil {
logs.Error("pcmExec can't create pcm execute directory: %v", err)
return
}
stdoutFilename := fmt.Sprintf("%s/%s.out", dir, hostName)
stderrFilename := fmt.Sprintf("%s/%s.err", dir, hostName)
exitCodeFilename := fmt.Sprintf("%s/%s.exit", dir, hostName)
stdout, err := os.Create(stdoutFilename)
if err != nil {
logs.Error("pcmExec can't create stdout file: %v", err)
return
}
stderr, err := os.Create(stderrFilename)
if err != nil {
logs.Error("pcmExec can't create stderr file: %v", err)
return
}
exitFlag, err := os.Create(exitCodeFilename)
if err != nil {
logs.Error("pcmExec can't create exit flag file: %v", err)
return
}
defer exitFlag.Close()
command, err := su.Command(pcm.UID, "bash", "-c", pcm.Command)
if err != nil {
logs.Error("pcmExec can't generate command in user's credential:%v", err)
return
}
command.Stderr = stderr
command.Stdout = stdout
err = command.Start()
if err != nil {
logs.Error("pcmExec can't start command: %v", err)
return
}
pid := command.Process.Pid
c := m.redisPool.Get()
defer c.Close()
key := fmt.Sprintf("hpc:pcm:pids:%d", pcm.CID)
_, err = c.Do("HSET", key, hostName, pid)
if err != nil {
logs.Error("pcmExec can't write pid into Redis: %v", err)
return
}
if err := command.Wait(); err != nil {
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
fmt.Fprintf(exitFlag, "%d", status.ExitStatus())
logs.Error("Pcm exit status: %d", status.ExitStatus())
}
} else {
// unknown code 999
fmt.Fprintf(exitFlag, "%d", 999)
}
} else {
fmt.Fprintf(exitFlag, "%d", 0)
}
}

以上内容中的su.Command(pcm.UID, "bash", "-c", pcm.Command)其实只是封装了一下Command命令,以特定的用户身份运行命令,具体代码:

1
2
3
4
5
6
7
8
9
10
11
// Command creates a new exec.Cmd that will run with user privilege.
func Command(uid int, command string, args ...string) (*exec.Cmd, error) {
cred, err := getUserCred(uid)
if err != nil {
return nil, err
}
cmd := exec.Command(command, args...)
cmd.SysProcAttr = &syscall.SysProcAttr{}
cmd.SysProcAttr.Credential = cred
return cmd, nil
}

监控命令输出

因为我们记录了pid到Redis,所以当命令执行完毕之后,应该清除掉这个pid。因为每条命令执行完毕之后,我都会将exit code记录到文件中,所以一旦有这个文件生成,则说明命令执行完毕了。这里我利用了’监听’共享存储的写入事件来确定一条命令执行完毕,而路径中已经包含了ID信息使得我可以方便的确定是哪条命令执行完毕了,使用到了"github.com/rjeczalik/notify",大致代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Watch sets up a watchpoint on the cmdOutputDir, the command output will be
// redirected into cmdOutputDir as file, Watch catches the event, then deletes
// the pid which recorded in Redis.
func (p *PcmManager) Watch() {
c := make(chan notify.EventInfo, p.chanBufferSize)
err := notify.Watch(p.cmdOutputDir+"/...", c, notify.Write)
if err != nil {
logs.Critical(err)
return
}
defer notify.Stop(c)
for {
e := <-c
path := e.Path()
if !strings.Contains(path, ".exit") {
continue
}
cid, hostname, err := p.parseHostnameAndCID(path)
if err != nil {
logs.Error("Pcm watcher catches an unexpected error: %v", err)
continue
}
key := fmt.Sprintf("hpc:pcm:pids:%s", cid)
_, err = store.Do("HDEL", key, hostname)
if err != nil {
logs.Error("Pcm watcher can't remove pids cached in Redis: %v", err)
continue
}
}
}

以上则已经实现了一个并行命令的基本功能。接下来就是查询和终止命令。

查询执行结果

经过上面的步骤我们已经把各个节点的命令输出重定向到了特定的目录下,所以只需要知道命令ID,去遍历那个目录下的文件稍加解析,即可获取到这条命令的执行情况。这部分内容比较简单,大家自行实现即可。需要稍加注意的是有的命令可能输出内容很长,所以我们应该利用buffer读取特定长度的内容,而不是直接返回所有内容。大致代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (p *PcmManager) readOutputFile(filename string) (content string,
trimmed bool, err error) {
buf := make([]byte, p.maxOutput)
f, err := os.Open(filename)
defer f.Close()
if err != nil {
return
}
n, err := f.Read(buf)
if err != nil && err != io.EOF {
return
}
if n >= p.maxOutput {
trimmed = true
}
content = string(buf[:n])
return content, trimmed, nil
}

终止命令

执行某些命令可能比较耗时,例如我执行一条命令sleep 300 && hostname,即在300秒后输出节点的hostname,这时候我们想终止这条命令的执行怎么办呢?我们已经在Redis中以hash表(key:hostname,value:pid)记录过了每个节点上执行的那条命令的pid,所以我们只需要取出这条命令的所有的pid以及对应的主机名就好啦,然后就在各个节点上杀掉特定的进程即可。对了别忘记更新一下MySQL中命令的状态哦。这里有一点需要注意的是我们记录的pid有可能只是一个父进程ID,而sleep 300 && hostname这样的命令其实会产生两个pid,所以我们要将他们一起杀掉。这里建议大家直接使用pkill -TERM -P $pid

总结

好啦。具体就介绍到这里了,相信大家已经都能根据上述内容实现一个简单的并行命令需求啦!其实核心就是消息的分发以及命令重定向到共享存储,这也是在分布式系统开发中经常用到的技巧,灵活利用能实现很多实用又有趣的功能。

坚持原创技术分享,您的支持将鼓励我继续创作!