I was recently looking into the DB layer of MIG, searching for ways to reduce the processing time of an action running on several thousands agents. One area of the code that was blatantly inefficient concerned database insertions.

When MIG's scheduler receives a new action, it pulls a list of eligible agents, and creates one command per agent. One action running on 2,000 agents will create 2,000 commands in the ''commands'' table of the database. The scheduler needs to generate each command, insert them into postgres and send them to their respective agents.

MIG uses separate goroutines for the processing of incoming actions, and the storage and sending of commands. The challenge was to avoid individually inserting each command in the database, but instead group all inserts together into one big operation.

Go provides a very elegant way to solve this very problem.

At a high level, MIG Scheduler works like this:

  1. a new action file in json format is loaded from the local spool and passed into the NewAction channel
  2. a goroutine picks up the action from the NewAction channel, validates it, finds a list of target agents and create a command for each agent, which is passed to a CommandReady channel
  3. a goroutine listens on the CommandReady channel, picks up incoming commands, inserts them into the database and sends them to the agents (plus a few extra things)
The CommandReady goroutine is where optimization happens. Instead of processing each command as they come, the goroutine uses a select() statement to either pick up a command, or timeout after one second of inactivity.
// Goroutine that loads and sends commands dropped in ready state
// it uses a select and a timeout to load a batch of commands instead of
// sending them one by one
go func() {
    ctx.OpID = mig.GenID()
    readyCmd := make(map[float64]mig.Command)
    ctr := 0
    for {
        select {
        case cmd := <-ctx.Channels.CommandReady:
            ctr++
            readyCmd[cmd.ID] = cmd
        case <-time.After(1 * time.Second):
            if ctr > 0 {
                var cmds []mig.Command
                for id, cmd := range readyCmd {
                    cmds = append(cmds, cmd)
                    delete(readyCmd, id)
                }
                err := sendCommands(cmds, ctx)
                if err != nil {
                    ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("%v", err)}.Err()
                }
            }
            // reinit
            ctx.OpID = mig.GenID()
            ctr = 0
        }
    }
}()

As long as messages are incoming, the select() statement will elect the case when a message is received, and store the command into the readyCmd map.

When messages stop coming for one second, the select statement will fall into its second case: time.After(1 * time.Second).

In the second case, the readyCmd map is emptied and all commands are sent as one operation. Later in the code, a big INSERT statement that include all commands is executed against the postgres database.

In essence, this algorithm is very similar to a Japanese Shishi-odoshi.

shishi-odoshi.gif

The current logic is not yet optimal. It does not currently set a maximum batch size, mostly because it does not currently need to. In my production environment, the scheduler manages  about 1,500 agents, and that's not enough to worry about limiting the batch size.