implement graceful shutdown to webhook mode
ensures all pending tasks are finished before quit.
This commit is contained in:
parent
a10a04e1ce
commit
6bae5d11c2
3 changed files with 172 additions and 133 deletions
|
|
@ -70,7 +70,7 @@ is up to you, eg. Nginx, Apache, or even a simple Go server.
|
|||
GitPass: token,
|
||||
}
|
||||
|
||||
s, err := lib.UseWebhook(bind, cfg)
|
||||
s, r, err := lib.UseWebhook(bind, cfg)
|
||||
if err != nil {
|
||||
fmt.Println("cannot create server: ", err)
|
||||
return
|
||||
|
|
@ -85,7 +85,11 @@ is up to you, eg. Nginx, Apache, or even a simple Go server.
|
|||
defer stop()
|
||||
|
||||
fmt.Println("starting server")
|
||||
httptask.Server(s, task.Timeout(10*time.Second)).Run(ctx)
|
||||
task.Wait(
|
||||
httptask.Server(s, task.Timeout(10*time.Second)),
|
||||
r.Run,
|
||||
).Run(ctx)
|
||||
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
|||
159
lib/git.go
Normal file
159
lib/git.go
Normal file
|
|
@ -0,0 +1,159 @@
|
|||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package lib
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type repoSpec struct {
|
||||
user string
|
||||
name string
|
||||
}
|
||||
|
||||
type GitRunner struct {
|
||||
ch <-chan repoSpec
|
||||
cfg *WebhookCFG
|
||||
}
|
||||
|
||||
func (g *GitRunner) Run(ctx context.Context) error {
|
||||
// graceful: exit only if all tasks are done
|
||||
for {
|
||||
select {
|
||||
case repo := <-g.ch:
|
||||
err := g.cfg.download(repo.user, repo.name)
|
||||
if err != nil {
|
||||
fmt.Println("Failed to download repo: ", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) pageDir(user, repo string) string {
|
||||
return filepath.Join(c.PageDir, user, repo)
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) gitDir(user, repo string) string {
|
||||
return filepath.Join(c.GitDir, user, repo)
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) checkout(user, repo string) (err error) {
|
||||
pageDir := c.pageDir(user, repo)
|
||||
gitDir := c.gitDir(user, repo)
|
||||
|
||||
git := exec.Command(
|
||||
"git",
|
||||
"--git-dir", gitDir,
|
||||
"--work-tree", pageDir,
|
||||
"checkout", "origin/"+c.Branch,
|
||||
)
|
||||
output, err := git.CombinedOutput()
|
||||
if err != nil {
|
||||
fmt.Println("git checkout failed: ", err)
|
||||
fmt.Println("=========== Dump output: ============")
|
||||
fmt.Println(string(output))
|
||||
fmt.Println("=====================================")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) fetch(user, repo string) (err error) {
|
||||
gitDir := c.gitDir(user, repo)
|
||||
git := exec.Command(
|
||||
"git",
|
||||
"--git-dir", gitDir,
|
||||
"fetch", "origin", c.Branch,
|
||||
)
|
||||
output, err := git.CombinedOutput()
|
||||
if err != nil {
|
||||
fmt.Println("git fetch failed: ", err)
|
||||
fmt.Println("=========== Dump output: ============")
|
||||
fmt.Println(string(output))
|
||||
fmt.Println("=====================================")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) clone(user, repo string) (err error) {
|
||||
pageDir := c.pageDir(user, repo)
|
||||
err = os.MkdirAll(filepath.Dir(pageDir), 0755)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
gitDir := c.gitDir(user, repo)
|
||||
err = os.MkdirAll(filepath.Dir(gitDir), 0700)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
uri := c.Server
|
||||
uri.Path = "/" + path.Join(user, repo) + ".git"
|
||||
uri.User = url.UserPassword(c.GitUser, c.GitPass)
|
||||
|
||||
git := exec.Command(
|
||||
"git",
|
||||
"clone",
|
||||
"-b", c.Branch,
|
||||
"--single-branch",
|
||||
"--no-tags",
|
||||
"--separate-git-dir", gitDir,
|
||||
uri.String(),
|
||||
pageDir,
|
||||
)
|
||||
output, err := git.CombinedOutput()
|
||||
if err != nil {
|
||||
fmt.Println("git clone failed: ", err)
|
||||
fmt.Println("=========== Dump output: ============")
|
||||
fmt.Println(string(output))
|
||||
fmt.Println("=====================================")
|
||||
}
|
||||
|
||||
os.Remove(filepath.Join(pageDir, ".git"))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
var repoLock = &sync.Map{}
|
||||
|
||||
func (c *WebhookCFG) lock(user, repo string) func() {
|
||||
key := user + "/" + repo
|
||||
|
||||
lock, _ := repoLock.LoadOrStore(key, &sync.Mutex{})
|
||||
m := lock.(*sync.Mutex)
|
||||
m.Lock()
|
||||
return m.Unlock
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) download(user, repo string) (err error) {
|
||||
unlock := c.lock(user, repo)
|
||||
defer unlock()
|
||||
fmt.Println("Pulling ", user, repo)
|
||||
|
||||
gitDir := c.gitDir(user, repo)
|
||||
if _, err = os.Stat(gitDir); os.IsNotExist(err) {
|
||||
err = c.clone(user, repo)
|
||||
} else {
|
||||
err = c.fetch(user, repo)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = c.checkout(user, repo)
|
||||
return
|
||||
}
|
||||
138
lib/hook.go
138
lib/hook.go
|
|
@ -10,12 +10,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type WebhookCFG struct {
|
||||
|
|
@ -24,123 +19,7 @@ type WebhookCFG struct {
|
|||
GitDir string
|
||||
GitUser string
|
||||
GitPass string
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) pageDir(user, repo string) string {
|
||||
return filepath.Join(c.PageDir, user, repo)
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) gitDir(user, repo string) string {
|
||||
return filepath.Join(c.GitDir, user, repo)
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) checkout(user, repo string) (err error) {
|
||||
pageDir := c.pageDir(user, repo)
|
||||
gitDir := c.gitDir(user, repo)
|
||||
|
||||
git := exec.Command(
|
||||
"git",
|
||||
"--git-dir", gitDir,
|
||||
"--work-tree", pageDir,
|
||||
"checkout", "origin/"+c.Branch,
|
||||
)
|
||||
output, err := git.CombinedOutput()
|
||||
if err != nil {
|
||||
fmt.Println("git checkout failed: ", err)
|
||||
fmt.Println("=========== Dump output: ============")
|
||||
fmt.Println(string(output))
|
||||
fmt.Println("=====================================")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) fetch(user, repo string) (err error) {
|
||||
gitDir := c.gitDir(user, repo)
|
||||
git := exec.Command(
|
||||
"git",
|
||||
"--git-dir", gitDir,
|
||||
"fetch", "origin", c.Branch,
|
||||
)
|
||||
output, err := git.CombinedOutput()
|
||||
if err != nil {
|
||||
fmt.Println("git fetch failed: ", err)
|
||||
fmt.Println("=========== Dump output: ============")
|
||||
fmt.Println(string(output))
|
||||
fmt.Println("=====================================")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) clone(user, repo string) (err error) {
|
||||
pageDir := c.pageDir(user, repo)
|
||||
err = os.MkdirAll(filepath.Dir(pageDir), 0755)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
gitDir := c.gitDir(user, repo)
|
||||
err = os.MkdirAll(filepath.Dir(gitDir), 0700)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
uri := c.Server
|
||||
uri.Path = "/" + path.Join(user, repo) + ".git"
|
||||
uri.User = url.UserPassword(c.GitUser, c.GitPass)
|
||||
|
||||
git := exec.Command(
|
||||
"git",
|
||||
"clone",
|
||||
"-b", c.Branch,
|
||||
"--single-branch",
|
||||
"--no-tags",
|
||||
"--separate-git-dir", gitDir,
|
||||
uri.String(),
|
||||
pageDir,
|
||||
)
|
||||
output, err := git.CombinedOutput()
|
||||
if err != nil {
|
||||
fmt.Println("git clone failed: ", err)
|
||||
fmt.Println("=========== Dump output: ============")
|
||||
fmt.Println(string(output))
|
||||
fmt.Println("=====================================")
|
||||
}
|
||||
|
||||
os.Remove(filepath.Join(pageDir, ".git"))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
var repoLock = &sync.Map{}
|
||||
|
||||
func (c *WebhookCFG) lock(user, repo string) func() {
|
||||
key := user + "/" + repo
|
||||
|
||||
lock, _ := repoLock.LoadOrStore(key, &sync.Mutex{})
|
||||
m := lock.(*sync.Mutex)
|
||||
m.Lock()
|
||||
return m.Unlock
|
||||
}
|
||||
|
||||
func (c *WebhookCFG) download(user, repo string) (err error) {
|
||||
unlock := c.lock(user, repo)
|
||||
defer unlock()
|
||||
|
||||
gitDir := c.gitDir(user, repo)
|
||||
if _, err = os.Stat(gitDir); os.IsNotExist(err) {
|
||||
err = c.clone(user, repo)
|
||||
} else {
|
||||
err = c.fetch(user, repo)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = c.checkout(user, repo)
|
||||
return
|
||||
ch chan<- repoSpec
|
||||
}
|
||||
|
||||
type webhookPayload struct {
|
||||
|
|
@ -185,22 +64,19 @@ func (c *WebhookCFG) handle(w http.ResponseWriter, r *http.Request) {
|
|||
user, repo := path.Split(payload.Repository.FullName)
|
||||
user = user[:len(user)-1]
|
||||
go func() {
|
||||
fmt.Println("Pulling ", user, repo)
|
||||
err = c.download(user, repo)
|
||||
if err != nil {
|
||||
fmt.Println("Failed to download repository: ", err)
|
||||
return
|
||||
}
|
||||
c.ch <- repoSpec{user: user, name: repo}
|
||||
}()
|
||||
}
|
||||
|
||||
func UseWebhook(bind string, cfg *WebhookCFG) (*http.Server, error) {
|
||||
func UseWebhook(bind string, cfg *WebhookCFG) (*http.Server, *GitRunner, error) {
|
||||
if cfg == nil {
|
||||
return nil, errors.New("webhook config is nil")
|
||||
return nil, nil, errors.New("webhook config is nil")
|
||||
}
|
||||
s := &http.Server{
|
||||
Addr: bind,
|
||||
Handler: http.HandlerFunc(cfg.handle),
|
||||
}
|
||||
return s, nil
|
||||
ch := make(chan repoSpec, 5)
|
||||
cfg.ch = ch
|
||||
return s, &GitRunner{ch: ch, cfg: cfg}, nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue