Commit cdbcc85e by tingweiwang

fix

0 parents
Showing with 4819 additions and 0 deletions
FROM hb.seetatech.com/k8s/ubuntu-basic:16.04
COPY core--* /adl/bin/
COPY conf/ /adl/bin/conf
COPY conf/config.json /root/.docker/config.json
ENV PATH /adl/bin:$PATH
WORKDIR /adl/bin/
\ No newline at end of file
def label = "mypod-${UUID.randomUUID().toString()}"
podTemplate(label: label, cloud: 'kubernetes') {
node('private') {
stage('Clone') {
echo "开始下载autodl项目代码"
git url: "https://tingweiwang@gitlab.seetatech.com/tingweiwang/jenkins-slave-test.git"
git credentialsId: 'd2625477-5e8f-4661-84cf-aefabf33bc2f' }
stage('go test') {
sh "go version"
}
stage('docker test') {
sh "docker version"
}
stage('k8s test') {
sh "kubectl get node"
sh "sleep 1000000"
sh "make redeploy_all mode=dev"
}
}
}
KUBENETES=kubectl
REGISTRY=harbor_host/core
VERSION=1.11
GOPROXY=export GO111MODULE=on && export GOPROXY=http://goproxy.seetatech.com
GOCMD=$(GOPROXY) && go
GORUN=$(GOCMD) run
DOCKER=docker
now:=$(shell date +%Y%m%d%H%M%S)
.PHONY: clean build redeploy
build: build_server build_worker build_collector build_monitor
redeploy_all: build docker_build_worker docker_push_worker docker_build_nginx docker_push_nginx docker_build_collector docker_push_collector rewrite_yaml delete_server create_server delete_worker create_worker delete_collecor create_collecor delete_monitor create_monitor delete_nginx create_nginx recover_yaml
update: build docker_build_worker docker_push_worker docker_build_nginx docker_push_nginx docker_build_collector docker_push_collector rewrite_yaml apply_server apply_worker apply_collecor apply_monitor apply_nginx recover_yaml
private_deploy: docker_build_worker docker_push_worker docker_build_nginx docker_push_nginx docker_build_collector docker_push_collector rewrite_yaml delete_server create_server delete_worker create_worker delete_collecor create_collecor delete_monitor create_monitor delete_nginx create_nginx recover_yaml
redeploy_monitor: build docker_build_worker docker_push_worker rewrite_yaml delete_monitor create_monitor recover_yaml
redeploy_collector: build docker_build_worker docker_push_worker rewrite_yaml delete_collecor create_collecor recover_yaml
redeploy_nginx: docker_build_nginx docker_push_nginx delete_nginx rewrite_yaml create_nginx recover_yaml
redeploy_log_stash: build_log_stash docker_build_log_stash docker_push_log_stash delete_log_stash rewrite_yaml create_log_stash recover_yaml
build_server:
$(GOCMD) build -o core--server ./cmd/server/main.go
build_worker:
$(GOCMD) build -o core--worker ./cmd/worker/main.go
build_collector:
$(GOCMD) build -o core--collector ./cmd/collector/main.go
build_monitor:
$(GOCMD) build -o core--monitor ./cmd/monitor/main.go
build_log_stash:
$(GOCMD) build -o log-stash-v1 ./cmd/log_stash/main.go
create_server:
@$(KUBENETES) create -f $(DeployYAMLPath)/server-deployment.yaml || $(call panic_and_recover_yaml)
@$(KUBENETES) create -f $(DeployYAMLPath)/server-svc.yaml || $(call panic_and_recover_yaml)
create_worker:
@$(KUBENETES) create -f $(DeployYAMLPath)/worker-deployment.yaml || $(call panic_and_recover_yaml)
create_collecor:
@$(KUBENETES) create -f $(DeployYAMLPath)/collector-deployment.yaml || $(call panic_and_recover_yaml)
create_monitor:
@$(KUBENETES) create -f $(DeployYAMLPath)/monitor-deployment.yaml || $(call panic_and_recover_yaml)
create_nginx:
@$(KUBENETES) create -f $(DeployYAMLPath)/nginx.yaml || $(call panic_and_recover_yaml)
@$(KUBENETES) create -f $(DeployYAMLPath)/nginx-svc.yaml || $(call panic_and_recover_yaml)
create_log_stash:
@$(KUBENETES) create -f $(DeployYAMLPath)/log-stash-deployment*.yaml || $(call panic_and_recover_yaml)
apply_server:
@$(KUBENETES) apply -f $(DeployYAMLPath)/server-deployment.yaml --record || $(call panic_and_recover_yaml)
apply_worker:
@$(KUBENETES) apply -f $(DeployYAMLPath)/worker-deployment.yaml --record || $(call panic_and_recover_yaml)
apply_collecor:
@$(KUBENETES) apply -f $(DeployYAMLPath)/collector-deployment.yaml --record || $(call panic_and_recover_yaml)
apply_nginx:
@$(KUBENETES) apply -f $(DeployYAMLPath)/nginx.yaml --record || $(call panic_and_recover_yaml)
apply_monitor:
@$(KUBENETES) apply -f $(DeployYAMLPath)/monitor-deployment.yaml --record || $(call panic_and_recover_yaml)
delete_server:
-@$(KUBENETES) delete -f $(DeployYAMLPath)/server-deployment.yaml
-@$(KUBENETES) delete -f $(DeployYAMLPath)/server-svc.yaml
delete_worker:
-@$(KUBENETES) delete -f $(DeployYAMLPath)/worker-deployment.yaml
delete_collecor:
-@$(KUBENETES) delete -f $(DeployYAMLPath)/collector-deployment.yaml
delete_monitor:
-@$(KUBENETES) delete -f $(DeployYAMLPath)/monitor-deployment.yaml
delete_nginx:
-@$(KUBENETES) delete -f $(DeployYAMLPath)/nginx.yaml
-@$(KUBENETES) delete -f $(DeployYAMLPath)/nginx-svc.yaml
delete_log_stash:
-@$(KUBENETES) delete -f $(DeployYAMLPath)/log-stash-deployment*.yaml
docker_build_worker:
@echo "build tag is: "$(now)
$(DOCKER) build -f Dockerfile -t $(REGISTRY)/adl-core-v1:$(now) .
docker_push_worker:
$(DOCKER) push $(REGISTRY)/adl-core-v1:$(now)
docker_build_nginx:
$(DOCKER) build -f cmd/nginx/Dockerfile -t $(REGISTRY)/core--nginx:$(now) cmd/nginx/
docker_push_nginx:
$(DOCKER) push $(REGISTRY)/core--nginx:$(now)
docker_build_collector:
$(DOCKER) build -f cmd/collector/Dockerfile -t $(REGISTRY)/core--collector:$(now) .
docker_push_collector:
$(DOCKER) push $(REGISTRY)/core--collector:$(now)
docker_build_log_stash:
$(DOCKER) build -f cmd/log_stash/Dockerfile -t $(REGISTRY)/log-stash-v1:$(now) .
docker_push_log_stash:
$(DOCKER) push $(REGISTRY)/log-stash-v1:$(now)
rewrite_yaml:
@sed -i "s#DOCKER_REGISTRY#$(REGISTRY)#g" `find $(DeployYAMLPath) -type f -name "*.yaml"`
@sed -i "s#latest#$(now)#g" `find $(DeployYAMLPath) -type f -name "*.yaml"`
recover_yaml:
@sed -i "s#$(now)#latest#g" `find $(DeployYAMLPath) -type f -name "*.yaml"`
@sed -i "s#$(REGISTRY)#DOCKER_REGISTRY#g" `find $(DeployYAMLPath) -type f -name "*.yaml"`
define panic_and_recover_yaml
(sed -i "s#$(now)#latest#g" `find $(DeployYAMLPath) -type f -name "*.yaml"` && sed -i "s#$(REGISTRY)#DOCKER_REGISTRY#g" `find $(DeployYAMLPath) -type f -name "*.yaml"` && exit 1)
endef
help:
$(info You should set the mode, e.g. `make {command} mode=dev` || `make {command} mode=release`)
$(info )
$(info )
clean:
-ls | grep core-- | xargs rm
-ls | grep "\."log | xargs rm
ifeq ($(mode),dev)
DeployYAMLPath=deploy/develop
else
ifeq ($(mode),release)
DeployYAMLPath=deploy/release
else
$(error You should set the mode, e.g. `make {command} mode=dev` || `make {command} mode=release`)
endif
endif
## 部署方式
### 服务部署
1. 修改Makefile中的 REGISTRY
2. make redeploy_all
### 注意事项
1. k8s node的name需要和/etc/hostname保持一致
\ No newline at end of file
FROM hb.seetatech.com/k8s/cuda:10.0-base-ubuntu16.04
COPY conf/ /adl/bin/conf
COPY conf/config.json /root/.docker/config.json
COPY core--collector /adl/bin/
WORKDIR /adl/bin/
ENV PATH /adl/bin:$PATH
package main
import (
_ "autodl-core/conf"
"autodl-core/service"
"context"
)
func main() {
collectorFactory := service.InitializeCollectorFactory()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go collectorFactory.Collector.CollectNodeResourceInfo(ctx)
collectorFactory.Collector.CollectTaskLog()
}
FROM ubuntu:16.04
RUN apt-get update && \
apt-get install -y git && \
export DEBIAN_FRONTEND=noninteractive && \
apt-get install -y tzdata && \
ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
dpkg-reconfigure -f noninteractive tzdata
COPY conf/ /adl/bin/conf
COPY conf/config.json /root/.docker/config.json
COPY log-stash-v1 /adl/bin/
WORKDIR /adl/bin/
ENV PATH /adl/bin:$PATH
package main
import (
_ "autodl-core/conf"
"autodl-core/service"
"github.com/spf13/viper"
"os"
)
func main() {
logStash := service.InitializeLogStashFactory()
if len(os.Args) > 1 {
logStash.LogStash.CollectTaskLog(os.Args[1])
} else {
logStash.LogStash.CollectTaskLog(viper.GetString("k8s.namespace"))
}
}
package main
import (
_ "autodl-core/conf"
"autodl-core/service"
"context"
)
func main() {
monitor := service.InitializeMonitorFactory()
stop := make(chan error)
ctx, cancelFunc := context.WithCancel(context.Background())
go monitor.Monitor.WatchTaskStatus(ctx, stop)
go monitor.Monitor.HandleTaskStatus(ctx, stop)
select {
case err := <-stop:
cancelFunc()
if err != nil {
panic(err.Error())
}
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>AutoDL</title>
<style>
.content {
position: absolute;
top: 40%;
left: 50%;
transform: translate(-50%);
text-align: center;
display: flex;
flex-direction: column;
align-items: center;
font-size: 40px;
color: #1F95FF;
}
</style>
</head>
<body>
<div class="content">
<span>服务无法访问,请稍后再试</span>
</div>
</body>
</html>
\ No newline at end of file
FROM nginx
COPY nginx.conf /etc/nginx/nginx.conf
COPY apiserver.conf /etc/nginx/conf.d/default.conf
COPY *.html /etc/nginx/
server {
listen 80;
location ~ /tensorboard/([-_.:\w]+)/(.*) {
resolver kube-dns.kube-system.svc.cluster.local valid=5s;
rewrite_log on;
rewrite ^/tensorboard/([-_.:\w]+)/(.*) /$2 break;
proxy_pass http://$1;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Origin "";
}
location ~ /jupyter/([-_.:\w]+)/(.*) {
resolver kube-dns.kube-system.svc.cluster.local valid=1s;
rewrite_log on;
rewrite ^/jupyter/([-_.:\w]+)/(.*) /jupyter/$1/$2 break;
proxy_pass http://$1;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Origin "";
}
location ~ /standard/([-_.:\w]+)/(.*) {
resolver kube-dns.kube-system.svc.cluster.local valid=5s;
rewrite_log on;
rewrite ^/standard/([-_.:\w]+)/(.*) /$2 break;
proxy_pass http://$1;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Origin "";
}
error_page 404 502 /502.html;
location = /502.html {
root /etc/nginx;
}
}
user nginx;
worker_processes 2;
worker_rlimit_nofile 65535;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 4096;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
#tcp_nopush on;
keepalive_timeout 65;
#gzip on;
include /etc/nginx/conf.d/*.conf;
}
package main
import (
_ "autodl-core/conf"
"autodl-core/service"
log "github.com/cihub/seelog"
"github.com/spf13/viper"
"gitlab.seetatech.com/autodl.com/autodl-base.git/protoV1"
"google.golang.org/grpc"
"net"
)
func main() {
serverFactory := service.InitializeServerFactory()
lis, err := net.Listen("tcp", viper.GetString("app.rpc_port"))
if err != nil {
log.Errorf("failed to listen: %v", err)
}
s := grpc.NewServer()
proto_v1.RegisterCoreServer(s, serverFactory.RpcServer)
if err := s.Serve(lis); err != nil {
log.Errorf("failed to serve: %v", err)
}
}
package main
import (
_ "autodl-core/conf"
"autodl-core/service"
)
func main() {
workerFactory := service.InitializeWorkerFactory()
if err := workerFactory.Worker.Run(); err != nil {
panic("panic: " + err.Error())
}
}
app:
rpc_port: ":30100"
mount_path: "/mnt/ceph"
database:
type: "mysql"
host: mysql_host
name: autodl-core
user: mysql_user
password: mysql_password
url: "%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local"
redis:
addr: "redis_host"
password: "redis_password"
mongodb:
addr: "mongo_host"
k8s:
namespace: autodl
pvc: adl-pvc
docker:
registry_host: harbor_host
registry_group_name: core
host:
nginx_host: core_nginx
{
"auths": {
"172.24.14.24:5000": {
"auth": "YWRtaW46YXV0b2Nubl9zZWV0YXRlY2g="
}
},
"HttpHeaders": {
"User-Agent": "Docker-Client/18.03.1-ce (linux)"
}
}
package conf
import (
log "github.com/cihub/seelog"
"github.com/spf13/viper"
)
const (
viperConfigFile = "conf/app.yaml"
logConfigFile = "conf/log.xml"
)
func init() {
viper.Reset()
viper.SetConfigType("yaml")
viper.SetConfigFile(viperConfigFile)
if e := viper.ReadInConfig(); e != nil {
panic(e)
}
logger, err := log.LoggerFromConfigAsFile(logConfigFile)
if err != nil {
panic(err)
}
_ = log.ReplaceLogger(logger)
}
<seelog type="sync">
<outputs formatid="main">
<console/>
</outputs>
<formats>
<format id="main" format="%Date/%Time [%LEV] [%FullPath:%Line %Func] %Msg%n"/>
</formats>
</seelog>
\ No newline at end of file
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
name: core--collector
namespace: autodl
spec:
template:
metadata:
labels:
app: core--collector
logCollect: "true"
spec:
serviceAccountName: autodl-serviceaccount
containers:
- name: core--collector
image: DOCKER_REGISTRY/core--collector:latest
imagePullPolicy: Always
volumeMounts:
- mountPath: /mnt/ceph
name: adl-volume
- name: docker-sock
mountPath: /var/run/docker.sock
subPath: docker.sock
- name: hostname
mountPath: /etc/autocnn_hostname
command: ["core--collector"]
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1024Mi"
volumes:
- name: adl-volume
persistentVolumeClaim:
claimName: adl-pvc
readOnly: false
- name: docker-sock
hostPath:
path: /var/run/
- name: hostname
hostPath:
path: /etc/hostname
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: log-stash-v1
namespace: seetaas
spec:
template:
metadata:
labels:
app: log-stash-v1
logCollect: "true"
spec:
serviceAccountName: seetaas-serviceaccount
containers:
- name: log-stash-v1
image: DOCKER_REGISTRY/log-stash-v1:latest
imagePullPolicy: Always
volumeMounts:
- mountPath: /mnt/ceph
name: adl-volume
- name: docker-sock
mountPath: /var/run/docker.sock
subPath: docker.sock
- name: hostname
mountPath: /etc/autocnn_hostname
command: ["log-stash-v1", "seetaas"]
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1024Mi"
volumes:
- name: adl-volume
persistentVolumeClaim:
claimName: adl-pvc
readOnly: false
- name: docker-sock
hostPath:
path: /var/run/
- name: hostname
hostPath:
path: /etc/hostname
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: log-stash-v1
namespace: autodl
spec:
template:
metadata:
labels:
app: log-stash-v1
logCollect: "true"
spec:
serviceAccountName: autodl-serviceaccount
containers:
- name: log-stash-v1
image: DOCKER_REGISTRY/log-stash-v1:latest
imagePullPolicy: Always
volumeMounts:
- mountPath: /mnt/ceph
name: adl-volume
- name: docker-sock
mountPath: /var/run/docker.sock
subPath: docker.sock
- name: hostname
mountPath: /etc/autocnn_hostname
command: ["log-stash-v1"]
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1024Mi"
volumes:
- name: adl-volume
persistentVolumeClaim:
claimName: adl-pvc
readOnly: false
- name: docker-sock
hostPath:
path: /var/run/
- name: hostname
hostPath:
path: /etc/hostname
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: core--monitor
namespace: autodl
spec:
replicas: 1
template:
metadata:
labels:
app: core--monitor
logCollect: "true"
spec:
serviceAccountName: autodl-serviceaccount
nodeSelector:
internal_service_node: "true"
containers:
- name: core--monitor
image: DOCKER_REGISTRY/adl-core-v1:latest
imagePullPolicy: Always
volumeMounts:
- mountPath: /mnt/ceph
name: adl-volume
- name: docker-sock
mountPath: /var/run/docker.sock
subPath: docker.sock
command: ["core--monitor"]
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1024Mi"
volumes:
- name: adl-volume
persistentVolumeClaim:
claimName: adl-pvc
readOnly: false
- name: docker-sock
hostPath:
path: /var/run/
apiVersion: v1
kind: Service
metadata:
name: core--nginx-svc
namespace: autodl
spec:
type: NodePort
selector:
app: core--nginx
ports:
- port: 80
name: port-80
targetPort: 80
nodePort: 30099
\ No newline at end of file
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: core--nginx
namespace: autodl
spec:
replicas: 1
template:
metadata:
labels:
app: core--nginx
spec:
nodeSelector:
internal_service_node: "true"
containers:
- name: core--nginx
image: DOCKER_REGISTRY/core--nginx:latest
imagePullPolicy: Always
ports:
- containerPort: 80
name: port-80
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "500m"
memory: "1024Mi"
\ No newline at end of file
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: core--server
namespace: autodl
spec:
replicas: 1
template:
metadata:
labels:
app: core--server
logCollect: "true"
spec:
serviceAccountName: autodl-serviceaccount
nodeSelector:
internal_service_node: "true"
containers:
- name: core--server
image: DOCKER_REGISTRY/adl-core-v1:latest
imagePullPolicy: Always
volumeMounts:
- mountPath: /mnt/ceph
name: adl-volume
- name: docker-sock
mountPath: /var/run/docker.sock
subPath: docker.sock
command: ["core--server"]
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1024Mi"
volumes:
- name: adl-volume
persistentVolumeClaim:
claimName: adl-pvc
readOnly: false
- name: docker-sock
hostPath:
path: /var/run/
apiVersion: v1
kind: Service
metadata:
labels:
app: core--server
name: core--server
namespace: autodl
spec:
type: NodePort
selector:
app: core--server
ports:
- port: 30100
name: port-30100
targetPort: 30100
nodePort: 30100
\ No newline at end of file
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: core--worker
namespace: autodl
spec:
replicas: 1
template:
metadata:
labels:
app: core--worker
logCollect: "true"
spec:
serviceAccountName: autodl-serviceaccount
nodeSelector:
internal_service_node: "true"
containers:
- name: core--worker
image: DOCKER_REGISTRY/adl-core-v1:latest
imagePullPolicy: Always
volumeMounts:
- mountPath: /mnt/ceph
name: adl-volume
- name: docker-sock
mountPath: /var/run/docker.sock
subPath: docker.sock
command: ["core--worker"]
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1024Mi"
volumes:
- name: adl-volume
persistentVolumeClaim:
claimName: adl-pvc
readOnly: false
- name: docker-sock
hostPath:
path: /var/run/
package git_repo
import (
"bytes"
"errors"
"fmt"
log "github.com/cihub/seelog"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
)
// ConfigMap maps config keys to their values.
type ConfigMap map[string]string
// Ref is the basic way to point at an individual commit in Git.
type Ref struct {
SHA, Path string
r *Repo
}
// RefMap maps ref names into Ref structs.
type RefMap map[string]*Ref
// Repo is the main struct that we use to track Git repositories.
type Repo struct {
// GitDir is the directory that the Git metadata is in for this repo.
GitDir string
// WorkDir is the directory that holds the working tree for this repo.
WorkDir string
// refs holds the cached RefMap.
refs RefMap
// cfg holds the cached config data.
cfg ConfigMap
}
var gitCmd string
var statusRE *regexp.Regexp
var statMap = map[string]string{
" ": "unmodified",
"M": "modified",
"A": "added",
"D": "deleted",
"R": "renamed",
"C": "copied",
"U": "unmerged",
"?": "untracked",
"!": "ignored",
}
func init() {
var err error
if gitCmd, err = exec.LookPath("git"); err != nil {
panic("Cannot find git command!")
}
statusRE = regexp.MustCompile("^([ MADRCU!?])([ MADRCU?!]) (.*)$")
}
func findRepo(path string) (found bool, gitdir, workdir string) {
stat, err := os.Stat(path)
if err != nil {
panic("Could not stat " + path)
}
if !stat.IsDir() {
panic(path + " is not a directory!")
}
if strings.HasSuffix(path, ".git") {
if stat, err = os.Stat(filepath.Join(path, "config")); err == nil {
found = true
gitdir = path
workdir = ""
return
}
}
if stat, err = os.Stat(filepath.Join(path, ".git", "config")); err != nil {
found = false
return
}
found = true
gitdir = filepath.Join(path, ".git")
workdir = path
return
}
// Open the first git repository that "owns" path.
func Open(path string) (repo *Repo, err error) {
if path == "" {
path = "."
}
path, err = filepath.Abs(path)
basepath := path
if err != nil {
return
}
for {
found, gitdir, workdir := findRepo(path)
if found {
repo = new(Repo)
repo.GitDir = gitdir
repo.WorkDir = workdir
return
}
parent := filepath.Dir(path)
if parent == path {
break
}
path = parent
}
return nil, errors.New(fmt.Sprintf("Could not find a Git repository in %s or any of its parents!", basepath))
}
// Git is a helper for creating exec.Cmd types and arranging to capture
// the output and erro streams of the command into bytes.Buffers
func Git(cmd string, args ...string) (res *exec.Cmd, stdout, stderr *bytes.Buffer) {
cmdArgs := make([]string, 1)
cmdArgs[0] = cmd
cmdArgs = append(cmdArgs, args...)
res = exec.Command(gitCmd, cmdArgs...)
stdout, stderr = new(bytes.Buffer), new(bytes.Buffer)
res.Stdout, res.Stderr = stdout, stderr
return
}
// Git is a helper for making sure that the Git command runs in the proper repository.
func (r *Repo) Git(cmd string, args ...string) (res *exec.Cmd, out, err *bytes.Buffer) {
var path string
if r.WorkDir == "" {
path = r.GitDir
} else {
path = r.WorkDir
}
res, out, err = Git(cmd, args...)
res.Dir = path
return
}
func (r *Repo) Commit() error {
cmd, stdout, stderr := r.Git("add", "-A")
if err := cmd.Run(); err != nil {
return errors.New(stdout.String() + " " + stderr.String())
}
cmd, stdout, stderr = r.Git("-c", "user.email='autocnn@seetatech.com'", "-c", "user.name='autocnn", "commit", "-m", "'commit'")
if err := cmd.Run(); err != nil {
return errors.New(stdout.String() + " " + stderr.String())
}
return nil
}
func (r *Repo) GetLastCommit() (string, error) {
cmd, out, stderr := r.Git("--no-pager", "log", "--pretty=oneline", "-1", r.WorkDir)
if err := cmd.Run(); err != nil {
return "", errors.New(stderr.String())
}
stdout := out.String()
if len(stdout) > 0 && strings.Contains(stdout, " ") {
return strings.Split(stdout, " ")[0], nil
}
return "", errors.New("no commit found")
}
func (r *Repo) Checkout(commit string) error {
if len(commit) == 0 {
commit = "master"
}
cmd, stdout, stderr := r.Git("checkout", commit)
if err := cmd.Run(); err != nil {
return errors.New(stderr.String() + ";" + stdout.String())
}
return nil
}
func (r *Repo) Reset(commit string) error {
cmd, _, stderr := r.Git("reset", "--hard", commit)
if err := cmd.Run(); err != nil {
return errors.New(stderr.String())
}
return nil
}
// Init initializes new Get metadata at the passed path.
// The rest of the args are passed to the 'git init' command unchanged.
func Init(path string, args ...string) (res *Repo, err error) {
cmd, _, stderr := Git("init", append(args, path)...)
if err = cmd.Run(); err != nil {
return nil, errors.New(stderr.String())
}
res, err = Open(path)
return
}
func InitAndCommit(path string) (string, error) {
repo, err := Init(path)
if err != nil {
log.Info("repos maybe initialized before")
repo, err = Open(path)
}
err = repo.Commit()
if err != nil {
log.Warn("repos commit failed: ", err.Error())
}
commit, err := repo.GetLastCommit()
if err != nil {
return "", err
}
return commit, nil
}
// Clone a new git repository. The clone will be created in the current
// directory.
func Clone(source, target string, args ...string) (res *Repo, err error) {
cmd, _, stderr := Git("clone", append(args, source, target)...)
if err = cmd.Run(); err != nil {
return nil, errors.New(stderr.String())
}
res, err = Open(target)
return
}
// StatLine holds interesting bits of git status output.
type StatLine struct {
indexStat, workStat, oldPath, newPath string
}
// StatLines is a slice of statuses.
type StatLines []*StatLine
// Print prints a StatLine in human readable format.
func (s *StatLine) Print() string {
var res string
if s.indexStat == "R" {
res = fmt.Sprintf("%s was renamed to %s\n", s.oldPath, s.newPath)
}
res = res + fmt.Sprintf("%s is %s in the index and %s in the working tree.",
s.newPath,
statMap[s.indexStat],
statMap[s.workStat])
return res
}
func (r *Repo) mapStatus() (res StatLines) {
var thisStat *StatLine
cmd, out, err := r.Git("status", "--porcelain", "-z")
if cmd.Run() != nil {
panic(err.String())
}
for {
line, err := out.ReadString(0)
if err != nil {
break
}
parts := statusRE.FindStringSubmatch(line)
if parts != nil {
if thisStat != nil {
res = append(res, thisStat)
}
thisStat = new(StatLine)
thisStat.indexStat = parts[1]
thisStat.workStat = parts[2]
thisStat.oldPath = parts[3]
thisStat.newPath = parts[3]
} else if thisStat != nil {
thisStat.newPath = line
} else {
panic("Cannot happen!")
}
}
if thisStat != nil {
res = append(res, thisStat)
}
return
}
module autodl-core
require (
cloud.google.com/go v0.44.3 // indirect
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/Azure/go-autorest v13.0.0+incompatible // indirect
github.com/Microsoft/go-winio v0.0.0-20190322214808-dd3d7fa17846 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/Unknwon/com v0.0.0-20181010210213-41959bdd855f // indirect
github.com/adl-golang/quicktemplate v1.0.3
github.com/c9s/goprocinfo v0.0.0-20191125144613-4acdd056c72d
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575
github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 // indirect
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.15+incompatible // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/docker/docker v0.0.0-20180616010903-de0abf4315fd // indirect
github.com/docker/go-connections v0.0.0-20180821093606-97c2040d34df // indirect
github.com/docker/go-units v0.3.3 // indirect
github.com/docker/libnetwork v0.0.0-20190314230531-ebcade70ad10 // indirect
github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c // indirect
github.com/elazarl/goproxy v0.0.0-20190711103511-473e67f1d7d2 // indirect
github.com/emicklei/go-restful v2.9.6+incompatible // indirect
github.com/evanphx/json-patch v4.5.0+incompatible // indirect
github.com/fsouza/go-dockerclient v1.2.1
github.com/gin-gonic/gin v0.0.0-20180531061340-caf3e350a548
github.com/go-kit/kit v0.9.0 // indirect
github.com/go-openapi/spec v0.19.2 // indirect
github.com/go-openapi/swag v0.19.5 // indirect
github.com/go-redis/redis v6.15.6+incompatible
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.3.0 // indirect
github.com/gogs/git-module v0.0.0-20181023105832-dfc2c1e6d377
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 // indirect
github.com/google/pprof v0.0.0-20190723021845-34ac40c74b70 // indirect
github.com/google/wire v0.4.0
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/gophercloud/gophercloud v0.3.0 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.11.0 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/jinzhu/gorm v1.9.10
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/pty v1.1.8 // indirect
github.com/levigross/grequests v0.0.0-20171009010347-bf9788368aa0
github.com/mcuadros/go-version v0.0.0-20180611085657-6d5863ca60fa // indirect
github.com/mindprince/gonvml v0.0.0-20190828220739-9ebdce4bb989
github.com/munnerz/goautoneg v0.0.0-20190414153302-2ae31c8b6b30 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/novln/docker-parser v0.0.0-20170503100553-6030251119d6
github.com/onsi/ginkgo v1.10.3
github.com/onsi/gomega v1.7.1
github.com/opencontainers/go-digest v0.0.0-20190228220655-ac19fd6e7483 // indirect
github.com/opencontainers/image-spec v0.0.0-20190321123305-da296dcb1e47 // indirect
github.com/opencontainers/runc v0.0.0-20190322180631-11fc498ffa5c // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0 // indirect
github.com/prometheus/procfs v0.0.4 // indirect
github.com/rogpeppe/fastuuid v1.2.0 // indirect
github.com/rogpeppe/go-internal v1.3.1 // indirect
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect
github.com/spf13/viper v1.4.0
github.com/tidwall/pretty v1.0.0 // indirect
github.com/ugorji/go v1.1.7 // indirect
github.com/vishvananda/netlink v1.0.0 // indirect
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc // indirect
gitlab.seetatech.com/autodl.com/autodl-base.git v0.0.0-20200102092617-103b8ce89985
gitlab.seetatech.com/autodl.com/autodl-k8s.git v0.0.0-20190902080041-53a58554394c
go.etcd.io/bbolt v1.3.3 // indirect
go.mongodb.org/mongo-driver v0.0.0-20190315150634-c72645a64800
golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 // indirect
golang.org/x/exp v0.0.0-20190829153037-c13cbed26979 // indirect
golang.org/x/image v0.0.0-20190902063713-cb417be4ba39 // indirect
golang.org/x/mobile v0.0.0-20190830201351-c6da95954960 // indirect
golang.org/x/net v0.0.0-20191116160921-f9c825593386
golang.org/x/sys v0.0.0-20190830142957-1e83adbbebd0 // indirect
golang.org/x/tools v0.0.0-20190830223141-573d9926052a // indirect
google.golang.org/api v0.9.0 // indirect
google.golang.org/appengine v1.6.2 // indirect
google.golang.org/grpc v1.25.1
gopkg.in/inf.v0 v0.9.1 // indirect
gotest.tools v2.3.0+incompatible // indirect
honnef.co/go/tools v0.0.1-2019.2.2 // indirect
k8s.io/api v0.0.0-20190831074750-7364b6bdad65
k8s.io/apimachinery v0.0.0-20190831074630-461753078381
k8s.io/gengo v0.0.0-20190826232639-a874a240740c // indirect
k8s.io/utils v0.0.0-20190829053155-3a4a5477acf8 // indirect
sigs.k8s.io/structured-merge-diff v0.0.0-20190820212518-960c3cc04183 // indirect
)
go 1.13
This diff is collapsed. Click to expand it.
package k8s_plugin
import (
"github.com/spf13/viper"
"gitlab.seetatech.com/autodl.com/autodl-k8s.git/manager"
)
type K8sManager struct {
K8sManager *manager.K8SManager
}
func K8sManagerProvider() *K8sManager {
k8sManager, err := manager.NewK8SManager(viper.GetString("k8s.namespace"), false)
if err != nil {
panic("init k8s manager failed: " + err.Error())
}
return &K8sManager{
K8sManager: k8sManager,
}
}
package libs
const (
PodTaskIDLabel = "task_id"
PodServiceIDLabel = "service_id"
TaskLogIndexName = "task_log"
)
const K8S_API_VERSION_V1 = "v1"
const K8S_POD_KIND = "Pod"
package libs
import "time"
func RunnerFailedRetry(runner func() (err error), retryTimes int) (lastErr error) {
for i := 0; i < retryTimes; i++ {
lastErr = runner()
if lastErr != nil {
Delay(2 * i)
continue
}
return
}
return
}
func Delay(times int) {
if times > 30 {
time.Sleep(time.Second * 30)
return
}
time.Sleep(time.Second * time.Duration(times))
}
package docker
import (
"strings"
api "github.com/fsouza/go-dockerclient"
)
func getAuth() *api.AuthConfigurations {
c, err := api.NewAuthConfigurationsFromDockerCfg()
if err != nil {
return &api.AuthConfigurations{}
}
return c
}
func getAuthWithRegistry(registry string) api.AuthConfiguration {
auth := getAuth()
if auth != nil {
for k, v := range auth.Configs {
if indexName(k) == indexName(registry) {
return v
}
}
}
return api.AuthConfiguration{}
}
func getAuthWithImage(image string) api.AuthConfiguration {
empty := api.AuthConfiguration{}
auth := getAuthWithRegistry(image)
if auth == empty {
return getAuthWithRegistry("docker.io")
}
return auth
}
func indexName(val string) string {
val = toHostname(val)
// 'index.docker.io' => 'docker.io'
if val == "index.docker.io" {
return "docker.io"
}
return val
}
func toHostname(url string) string {
s := url
if strings.HasPrefix(url, "http://") {
s = strings.Replace(url, "http://", "", 1)
} else if strings.HasPrefix(url, "https://") {
s = strings.Replace(url, "https://", "", 1)
}
p := strings.SplitN(s, "/", 2)
return p[0]
}
package docker
import (
api "github.com/fsouza/go-dockerclient"
)
// Build options
const (
buildQuiet = false
buildRm = true
buildForceRm = true
buildMemory = 0
buildMemswap = -1
)
// BuildOptions contains the build configuration for the docker daemon.
type BuildOptions struct {
Name string
Directory string
Pull bool
NoCache bool
}
// See Docker interface
func (d docker) Build(option BuildOptions, stream LogStream) error {
return d.client.BuildImage(buildImageOptions(option, stream))
}
func buildImageOptions(option BuildOptions, stream LogStream) api.BuildImageOptions {
json, output := stream.OutputStream()
opts := api.BuildImageOptions{
Name: option.Name,
NoCache: option.NoCache,
SuppressOutput: buildQuiet,
RmTmpContainer: buildRm,
ForceRmTmpContainer: buildForceRm,
Pull: option.Pull,
OutputStream: output,
ContextDir: option.Directory,
RawJSONStream: json,
Memory: buildMemory,
Memswap: buildMemswap,
}
if auth := getAuth(); auth != nil {
opts.AuthConfigs = *auth
}
return opts
}
package docker
import (
"fmt"
"testing"
)
func TestDockerBuild(t *testing.T) {
endpoint := "unix:///var/run/docker.sock"
docker, err := New(endpoint)
//client, err := docker.NewClient(endpoint)
if err != nil {
panic(err)
}
buildOption := BuildOptions{
Name: "daiab_test",
Directory: "/tmp/docker1/",
Pull: true,
NoCache: true,
}
logStream := NewLogStream()
docker.Build(buildOption, logStream)
fmt.Println(logStream)
}
package docker
import (
api "github.com/fsouza/go-dockerclient"
"time"
)
func (d docker) GetContainer(id string) (*api.Container, error) {
return d.client.InspectContainer(id)
}
func (d docker) GetStats(id string, statsChan chan *api.Stats, done <-chan bool, stream bool) error {
return d.client.Stats(api.StatsOptions{
ID: id,
Stats: statsChan,
Timeout: time.Second * 2,
Done: done,
Stream: stream})
}
package docker
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
)
type streamDecoderWrapper struct {
out *bufio.Writer
err *bufio.Writer
buffer *bytes.Buffer
tty bool
lines map[string]int
diff int
}
type message struct {
Stream string `json:"stream,omitempty"`
Status string `json:"status,omitempty"`
Progress string `json:"progress,omitempty"`
ID string `json:"id,omitempty"`
Error string `json:"error,omitempty"`
}
func newStreamDecoderWrapper(out io.Writer, err io.Writer, tty bool) io.Writer {
buffer := &bytes.Buffer{}
lines := make(map[string]int)
diff := 0
return &streamDecoderWrapper{
out: bufio.NewWriter(out),
err: bufio.NewWriter(err),
buffer: buffer,
tty: tty,
lines: lines,
diff: diff}
}
func (w *streamDecoderWrapper) Write(p []byte) (int, error) {
defer w.Flush()
n, err := w.buffer.Write(p)
if err != nil {
return n, err
}
return n, w.Decode()
}
func (w streamDecoderWrapper) Flush() {
_ = w.err.Flush()
_ = w.out.Flush()
}
func (w streamDecoderWrapper) getOutWriter() io.Writer {
return w.out
}
func (w streamDecoderWrapper) getErrWriter() io.Writer {
return w.err
}
func (w streamDecoderWrapper) encode(m message) {
var endl string
if w.tty && m.Stream == "" && (m.Progress != "" || m.Status != "") {
// <ESC>[2K = erase entire current line
fmt.Fprintf(w.getOutWriter(), "%c[2K\r", 27)
endl = "\r"
} else if m.Progress != "" { //disable progressbar in non-terminal
return
}
if m.ID != "" {
fmt.Fprintf(w.getOutWriter(), "%s: ", m.ID)
}
if m.Progress != "" && w.tty {
fmt.Fprintf(w.getOutWriter(), "%s %s%s", m.Status, m.Progress, endl)
} else if m.Stream != "" {
fmt.Fprintf(w.getOutWriter(), "%s%s", m.Stream, endl)
} else {
fmt.Fprintf(w.getOutWriter(), "%s%s\n", m.Status, endl)
}
}
func (w *streamDecoderWrapper) setCursor(id string) {
if id != "" {
line, ok := w.lines[id]
if !ok {
// NOTE: This approach of using len(lines) to
// figure out the number of lines of history
// only works as long as we clear the history
// when we output something that's not
// accounted for in the map, such as a line
// with no ID.
line = len(w.lines)
w.lines[id] = line
if w.tty {
fmt.Fprintf(w.getOutWriter(), "\n")
}
} else {
w.diff = len(w.lines) - line
}
if w.tty {
// NOTE: this appears to be necessary even if
// diff == 0.
// <ESC>[{diff}A = move cursor up diff rows
fmt.Fprintf(w.getOutWriter(), "%c[%dA", 27, w.diff)
}
} else {
// When outputting something that isn't progress
// output, clear the history of previous lines. We
// don't want progress entries from some previous
// operation to be updated (for example, pull -a
// with multiple tags).
w.lines = make(map[string]int)
}
}
func (w *streamDecoderWrapper) Decode() error {
decoder := json.NewDecoder(w.buffer)
for {
m := message{}
if err := decoder.Decode(&m); err != nil {
if err != io.EOF {
return err
}
// Recopy remaining bytes into buffer to be available again for json decoder.
b, err := ioutil.ReadAll(decoder.Buffered())
if err != nil {
return err
}
w.buffer = bytes.NewBuffer(b)
return nil
}
if m.Error != "" {
return errors.New(m.Error)
}
w.setCursor(m.ID)
w.encode(m)
if m.ID != "" && w.tty {
// NOTE: this appears to be necessary even if
// diff == 0.
// <ESC>[{diff}B = move cursor down diff rows
fmt.Fprintf(w.getOutWriter(), "%c[%dB", 27, w.diff)
}
}
}
package docker
import (
api "github.com/fsouza/go-dockerclient"
)
// Docker wrap underlaying docker client to expose only required functions.
type Docker interface {
// Ping pings the docker server
Ping() error
// Logs attach a stream on a running container to read stdout and stderr
// output from docker logs.
Logs(id string, stream LogStream) error
// Run create and start a container to execute a runnable.
// Return the exit code of the container status, an error otherwise.
Run(option RunOptions, stream LogStream) (int, error)
// Build create a new image from a dockerfile.
Build(option BuildOptions, stream LogStream) error
// tag adds a tag to the image for a repository.
Tag(option TagOptions) error
// Push pushes an image to a remote registry.
Push(option PushOptions, stream LogStream) error
// ImageID returns an image ID by its name.
ImageID(name string) string
// RemoveImage removes an image by its name or ID.
RemoveImage(name string) error
GetContainer(id string) (*api.Container, error)
GetStats(id string, statsChan chan *api.Stats, done <-chan bool, stream bool) error
}
// The default implementation of the Docker interface.
type docker struct {
client *api.Client
}
// See Docker interface
func (d docker) Ping() error {
return d.client.Ping()
}
// New return a Docker client
func New(endpoint string) (Docker, error) {
c, err := api.NewClient(endpoint)
if err != nil {
return nil, err
}
d := &docker{}
d.client = c
if err = d.Ping(); err != nil {
return nil, err
}
return d, nil
}
package docker
import (
api "github.com/fsouza/go-dockerclient"
)
// Image options
const (
rmiForce = false
rmiNoPrune = false
)
func (d docker) ImageID(name string) string {
i, err := d.client.InspectImage(name)
if err != nil {
return ""
}
return i.ID
}
func (d docker) RemoveImage(name string) error {
if i, _ := d.client.InspectImage(name); i != nil {
if err := d.client.RemoveImageExtended(name, removeImageOptions()); err != nil {
return err
}
}
return nil
}
func removeImageOptions() api.RemoveImageOptions {
return api.RemoveImageOptions{
Force: rmiForce,
NoPrune: rmiNoPrune,
}
}
package docker
import (
"io"
"os"
//"github.com/andrew-d/go-termutil"
api "github.com/fsouza/go-dockerclient"
)
// LogStream contains two io.Writer for respectively, stdout and stderr, and also a JSON Decoder for the Docker API.
type LogStream struct {
Out io.Writer
Err io.Writer
Decoder io.Writer
}
// OutputStream return a stream output handler with its encoding.
func (l LogStream) OutputStream() (json bool, stream io.Writer) {
return true, l.Decoder
}
// See Docker interface
func (d docker) Logs(id string, stream LogStream) error {
return d.client.Logs(logsOptions(id, stream))
}
// NewLogStream return a default LogStream using OS stdout and stderr.
func NewLogStream() LogStream {
out := os.Stdout
err := os.Stderr
//decoder := newStreamDecoderWrapper(out, err, termutil.Isatty(out.Fd()))
decoder := newStreamDecoderWrapper(out, err, true)
return LogStream{Out: out, Err: err, Decoder: decoder}
}
func logsOptions(container string, stream LogStream) api.LogsOptions {
return api.LogsOptions{
Container: container,
OutputStream: stream.Out,
ErrorStream: stream.Err,
Follow: true,
Stdout: true,
Stderr: true,
Timestamps: false,
}
}
package docker
// Mock a Docker interface.
type Mock struct {
// Handler
RunHandler func(option RunOptions, stream LogStream) (int, error)
PingHandler func() error
LogsHandler func(id string, stream LogStream) error
BuildHandler func(option BuildOptions, stream LogStream) error
TagHandler func(option TagOptions) error
PushHandler func(option PushOptions, stream LogStream) error
ImageIDHandler func(name string) string
RemoveImageHandler func(name string) error
// Values
RunCalled bool
PingCalled bool
LogsCalled bool
BuildCalled bool
TagCalled bool
PushCalled bool
ImageIDCalled bool
RemoveImageCalled bool
RunOptions RunOptions
LogsID string
BuildOptions BuildOptions
TagOptions TagOptions
PushOptions PushOptions
ImageIDName string
RemoveImageName string
}
// NewMock return a mock of Docker interface.
func NewMock() *Mock {
d := &Mock{}
d.RunHandler = func(option RunOptions, stream LogStream) (int, error) {
return 0, nil
}
d.PingHandler = func() error {
return nil
}
d.LogsHandler = func(id string, stream LogStream) error {
return nil
}
d.BuildHandler = func(option BuildOptions, stream LogStream) error {
return nil
}
d.TagHandler = func(option TagOptions) error {
return nil
}
d.PushHandler = func(option PushOptions, stream LogStream) error {
return nil
}
d.ImageIDHandler = func(name string) string {
return ""
}
d.RemoveImageHandler = func(name string) error {
return nil
}
return d
}
// Run is a mock function for Docker interface.
func (m *Mock) Run(option RunOptions, stream LogStream) (int, error) {
m.RunCalled = true
m.RunOptions = option
return m.RunHandler(option, stream)
}
// Ping is a mock function for Docker interface.
func (m *Mock) Ping() error {
m.PingCalled = true
return m.PingHandler()
}
// Logs is a mock function for Docker interface.
func (m *Mock) Logs(id string, stream LogStream) error {
m.LogsCalled = true
m.LogsID = id
return m.LogsHandler(id, stream)
}
// Build is a mock function for Docker interface.
func (m *Mock) Build(option BuildOptions, stream LogStream) error {
m.BuildCalled = true
m.BuildOptions = option
return m.BuildHandler(option, stream)
}
// tag is a mock function for Docker interface.
func (m *Mock) Tag(option TagOptions) error {
m.TagCalled = true
m.TagOptions = option
return m.TagHandler(option)
}
// Push is a mock function for Docker interface.
func (m *Mock) Push(option PushOptions, stream LogStream) error {
m.PushCalled = true
m.PushOptions = option
return m.PushHandler(option, stream)
}
// ImageID is a mock function for Docker interface.
func (m *Mock) ImageID(name string) string {
m.ImageIDCalled = true
m.ImageIDName = name
return m.ImageIDHandler(name)
}
// RemoveImage is a mock function for Docker interface.
func (m *Mock) RemoveImage(name string) error {
m.RemoveImageCalled = true
m.RemoveImageName = name
return m.RemoveImageHandler(name)
}
package docker
import (
"strings"
parser "github.com/novln/docker-parser/docker"
)
// Reference is an opaque object that include identifier such as a name, tag, repository, registry, etc...
type Reference struct {
named parser.Named
tag string
}
// Name returns the image's name. (ie: debian[:8.2])
func (r Reference) Name() string {
return r.named.RemoteName() + r.tag
}
// ShortName returns the image's name (ie: debian)
func (r Reference) ShortName() string {
return r.named.RemoteName()
}
// tag returns the image's tag (or digest).
func (r Reference) Tag() string {
if len(r.tag) > 1 {
return r.tag[1:]
}
return ""
}
// Registry returns the image's registry. (ie: host[:port])
func (r Reference) Registry() string {
return r.named.Hostname()
}
// Repository returns the image's repository. (ie: registry/name)
func (r Reference) Repository() string {
return r.named.FullName()
}
// Remote returns the image's remote identifier. (ie: registry/name[:tag])
func (r Reference) Remote() string {
return r.named.FullName() + r.tag
}
func clean(url string) string {
s := url
if strings.HasPrefix(url, "http://") {
s = strings.Replace(url, "http://", "", 1)
} else if strings.HasPrefix(url, "https://") {
s = strings.Replace(url, "https://", "", 1)
}
return s
}
// Parse returns a Reference from analyzing the given remote identifier.
func Parse(remote string) (*Reference, error) {
n, err := parser.ParseNamed(clean(remote))
if err != nil {
return nil, err
}
n = parser.WithDefaultTag(n)
var t string
switch x := n.(type) {
case parser.Canonical:
t = "@" + x.Digest().String()
case parser.NamedTagged:
t = ":" + x.Tag()
}
return &Reference{named: n, tag: t}, nil
}
package docker
import (
api "github.com/fsouza/go-dockerclient"
)
// PushOptions contains the push configuration for the docker daemon.
type PushOptions struct {
Name string
Repository string
Registry string
Tag string
}
// See Docker interface
func (d docker) Push(option PushOptions, stream LogStream) error {
return d.client.PushImage(pushImageOptions(option, stream), pushAuthConfiguration(option))
}
func pushImageOptions(option PushOptions, stream LogStream) api.PushImageOptions {
json, output := stream.OutputStream()
return api.PushImageOptions{
Name: option.Repository,
Tag: option.Tag,
OutputStream: output,
RawJSONStream: json,
}
}
func pushAuthConfiguration(option PushOptions) api.AuthConfiguration {
return getAuthWithRegistry(option.Registry)
}
{% func RenderDockerfile(fromImage, srcCode, workdir string, copyCode bool, steps []string, envs map[string]string ) %}
FROM {%s fromImage %}
ENV SHELL /bin/bash
{% if copyCode %}
COPY {%s srcCode %} {%s workdir %}
{% endif %}
WORKDIR {%s workdir %}
{%for _, step := range steps%}
RUN {%s step %}
{% endfor %}
{%for key, value := range envs%}
ENV {%s key + " " + value %}
{% endfor %}
{% endfunc %}
// This file is automatically generated by qtc from "docker.qtpl".
// See https://github.com/valyala/quicktemplate for details.
//line docker.qtpl:1
package render
//line docker.qtpl:1
import (
qt422016 "github.com/adl-golang/quicktemplate"
qtio422016 "io"
)
//line docker.qtpl:1
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line docker.qtpl:1
func StreamRenderDockerfile(qw422016 *qt422016.Writer, fromImage, srcCode, workdir string, copyCode bool, steps []string, envs map[string]string) {
//line docker.qtpl:1
qw422016.N().S(`
FROM `)
//line docker.qtpl:2
qw422016.E().S(fromImage)
//line docker.qtpl:2
qw422016.N().S(`
ENV SHELL /bin/bash
`)
//line docker.qtpl:5
if copyCode {
//line docker.qtpl:5
qw422016.N().S(`
COPY `)
//line docker.qtpl:6
qw422016.E().S(srcCode)
//line docker.qtpl:6
qw422016.N().S(` `)
//line docker.qtpl:6
qw422016.E().S(workdir)
//line docker.qtpl:6
qw422016.N().S(`
`)
//line docker.qtpl:7
}
//line docker.qtpl:7
qw422016.N().S(`
WORKDIR `)
//line docker.qtpl:9
qw422016.E().S(workdir)
//line docker.qtpl:9
qw422016.N().S(`
`)
//line docker.qtpl:11
for _, step := range steps {
//line docker.qtpl:11
qw422016.N().S(`
RUN `)
//line docker.qtpl:12
qw422016.E().S(step)
//line docker.qtpl:12
qw422016.N().S(`
`)
//line docker.qtpl:13
}
//line docker.qtpl:13
qw422016.N().S(`
`)
//line docker.qtpl:15
for key, value := range envs {
//line docker.qtpl:15
qw422016.N().S(`
ENV `)
//line docker.qtpl:16
qw422016.E().S(key + " " + value)
//line docker.qtpl:16
qw422016.N().S(`
`)
//line docker.qtpl:17
}
//line docker.qtpl:17
qw422016.N().S(`
`)
//line docker.qtpl:19
}
//line docker.qtpl:1
func StreamRenderDockerfileRunFirst(qw422016 *qt422016.Writer, fromImage, srcCode, workdir string, copyCode bool, steps []string, envs map[string]string) {
//line docker.qtpl:1
qw422016.N().S(`
FROM `)
//line docker.qtpl:2
qw422016.E().S(fromImage)
//line docker.qtpl:2
qw422016.N().S(`
ENV SHELL /bin/bash
`)
//line docker.qtpl:7
qw422016.N().S(`
WORKDIR `)
//line docker.qtpl:9
qw422016.E().S(workdir)
//line docker.qtpl:9
qw422016.N().S(`
`)
//line docker.qtpl:11
for _, step := range steps {
//line docker.qtpl:11
qw422016.N().S(`
RUN `)
//line docker.qtpl:12
qw422016.E().S(step)
//line docker.qtpl:12
qw422016.N().S(`
`)
//line docker.qtpl:13
}
//line docker.qtpl:13
qw422016.N().S(`
`)
//line docker.qtpl:5
if copyCode {
//line docker.qtpl:5
qw422016.N().S(`
COPY `)
//line docker.qtpl:6
qw422016.E().S(srcCode)
//line docker.qtpl:6
qw422016.N().S(` `)
//line docker.qtpl:6
qw422016.E().S(workdir)
//line docker.qtpl:6
qw422016.N().S(`
`)
//line docker.qtpl:7
}
//line docker.qtpl:15
for key, value := range envs {
//line docker.qtpl:15
qw422016.N().S(`
ENV `)
//line docker.qtpl:16
qw422016.E().S(key + " " + value)
//line docker.qtpl:16
qw422016.N().S(`
`)
//line docker.qtpl:17
}
//line docker.qtpl:17
qw422016.N().S(`
`)
//line docker.qtpl:19
}
//line docker.qtpl:19
func WriteRenderDockerfile(qq422016 qtio422016.Writer, fromImage, srcCode, workdir string, copyCode bool, steps []string, envs map[string]string, copyAfterRun bool) {
//line docker.qtpl:19
qw422016 := qt422016.AcquireWriter(qq422016)
//line docker.qtpl:19
if copyAfterRun {
StreamRenderDockerfileRunFirst(qw422016, fromImage, srcCode, workdir, copyCode, steps, envs)
} else {
StreamRenderDockerfile(qw422016, fromImage, srcCode, workdir, copyCode, steps, envs)
}
//line docker.qtpl:19
qt422016.ReleaseWriter(qw422016)
//line docker.qtpl:19
}
//line docker.qtpl:19
//func RenderDockerfile(fromImage, srcCode, workdir string, copyCode bool, steps []string, envs map[string]string) string {
// //line docker.qtpl:19
// qb422016 := qt422016.AcquireByteBuffer()
// //line docker.qtpl:19
// WriteRenderDockerfile(qb422016, fromImage, srcCode, workdir, copyCode, steps, envs)
// //line docker.qtpl:19
// qs422016 := string(qb422016.B)
// //line docker.qtpl:19
// qt422016.ReleaseByteBuffer(qb422016)
// //line docker.qtpl:19
// return qs422016
// //line docker.qtpl:19
//}
package render
import (
"bytes"
"fmt"
"testing"
)
func TestRender(t *testing.T) {
var buf bytes.Buffer
WriteRenderDockerfile(&buf, "ubuntu:16.04", "/source_code",
"/code", true, []string{"run 1", "run 2"},
map[string]string{"env1": "value1", "env2": "value2"}, false)
fmt.Printf("buf=\n%s", buf.Bytes())
}
package docker
import (
"fmt"
"strings"
api "github.com/fsouza/go-dockerclient"
)
// Run options
const (
containerName = ""
useTTy = false
attachStdout = true
attachStderr = true
hostNetworkMode = "bridge"
networkDisabled = false
removeVolumes = true
forceRemove = false
)
// RunOptions contains the run configuration of the docker container
type RunOptions struct {
Image string
Command string
Env []string
Volumes []string
Links []string
}
// See Docker interface
func (d docker) Run(option RunOptions, stream LogStream) (int, error) {
r, err := Parse(option.Image)
if err != nil {
return 0, err
}
err = d.client.PullImage(pullImageOptions(r.Remote(), stream), pullAuthConfiguration(option))
if err != nil {
return 0, err
}
e, err := d.client.CreateContainer(createContainerOptions(option))
if err != nil {
return 0, err
}
id := e.ID
if err = d.client.StartContainer(id, nil); err != nil {
return 0, err
}
err = d.Logs(id, stream)
if err != nil {
fmt.Fprint(stream.Err, err)
}
exit, err := d.client.WaitContainer(id)
if err != nil {
return 0, err
}
if err = d.client.RemoveContainer(removeContainerOptions(id)); err != nil {
return 0, err
}
return exit, nil
}
func pullAuthConfiguration(option RunOptions) api.AuthConfiguration {
return getAuthWithImage(option.Image)
}
func pullImageOptions(remote string, stream LogStream) api.PullImageOptions {
json, output := stream.OutputStream()
return api.PullImageOptions{
Repository: remote,
OutputStream: output,
RawJSONStream: json,
}
}
func createContainerOptions(option RunOptions) api.CreateContainerOptions {
return api.CreateContainerOptions{
Name: containerName,
Config: &api.Config{
AttachStdout: attachStdout,
AttachStderr: attachStderr,
Tty: useTTy,
Env: option.Env,
NetworkDisabled: networkDisabled,
Image: option.Image,
Cmd: strings.Fields(option.Command),
},
HostConfig: &api.HostConfig{
NetworkMode: hostNetworkMode,
Binds: option.Volumes,
Links: option.Links,
},
}
}
func removeContainerOptions(id string) api.RemoveContainerOptions {
return api.RemoveContainerOptions{
ID: id,
RemoveVolumes: removeVolumes,
Force: forceRemove,
}
}
package docker
import (
api "github.com/fsouza/go-dockerclient"
)
// tag options
const (
tagForce = true
)
// TagOptions contains the tag configuration for the docker daemon.
type TagOptions struct {
Name string
Repository string
Tag string
}
// See Docker interface
func (d docker) Tag(option TagOptions) error {
return d.client.TagImage(option.Name, tagImageOptions(option))
}
func tagImageOptions(option TagOptions) api.TagImageOptions {
return api.TagImageOptions{
Repo: option.Repository,
Tag: option.Tag,
Force: tagForce,
}
}
package libs
import "errors"
var (
TaskIsDoneCanNotSetStatusError = errors.New("This task is done, so you can not set status on it, drop it! ")
TaskStatusExistCanNotSetStatusError = errors.New("This task status is exist, so you can not set status on it, drop it! ")
)
package libs
import (
"fmt"
)
func FailOnError(err error, msg string) {
if err != nil {
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
package parser
import (
log "github.com/cihub/seelog"
"github.com/gin-gonic/gin/json"
"gitlab.seetatech.com/autodl.com/autodl-base.git"
"gitlab.seetatech.com/autodl.com/autodl-base.git/protoV1"
"gitlab.seetatech.com/autodl.com/autodl-k8s.git/spawner"
)
func GetSpawnerParserConfig(task *proto_v1.Task, imageNameTag, serviceID string) (config *spawner.ParsedConfig) {
config = &spawner.ParsedConfig{
ID: task.TaskId,
Mount: []autodl_base.MountPath{},
Envs: make(map[string]string),
Cmd: task.BuildInfo.Cmd,
Image: imageNameTag,
Resource: autodl_base.ATCResource{},
MachineSelector: make(map[string]string),
HasService: task.PodInfo.ServiceType != proto_v1.PodInfo_NoService,
HasShareMemory: task.PodInfo.HaveShm,
ServiceID: serviceID,
}
for _, mount := range task.BuildInfo.Mounts {
if len(mount.CephPath) == 0 || len(mount.MountPath) == 0 {
continue
}
config.Mount = append(config.Mount, autodl_base.MountPath{
CephPath: mount.CephPath,
MountPath: mount.MountPath,
ReadOnly: mount.ReadOnly,
})
}
for _, env := range task.BuildInfo.Envs {
config.Envs[env.Key] = env.Value
}
config.Envs["PYTHONUNBUFFERED"] = "1"
config.Resource.Default = autodl_base.ATCPodResource{
CPU: autodl_base.ATCK8SResource{
Requests: task.PodInfo.Resource.Cpu.Requests,
Limits: task.PodInfo.Resource.Cpu.Limits,
},
GPU: autodl_base.ATCK8SResource{
Requests: task.PodInfo.Resource.Gpu.Requests,
Limits: task.PodInfo.Resource.Gpu.Limits,
},
Mem: autodl_base.ATCK8SResource{
Requests: task.PodInfo.Resource.Memory.Requests,
Limits: task.PodInfo.Resource.Memory.Limits,
},
}
if task.PodInfo.ServiceType != proto_v1.PodInfo_NoService {
for _, port := range task.PodInfo.Ports {
config.Ports = append(config.Ports, int(port))
}
}
for _, machineSelector := range task.PodInfo.MachineSelector {
config.MachineSelector[machineSelector.Key] = machineSelector.Value
}
t, e := json.Marshal(config)
log.Info("task config: ", string(t), e)
return
}
package path
import (
log "github.com/cihub/seelog"
"io"
"os"
)
func DeletePath(path string) error {
if _, err := os.Stat(path); err == nil {
return os.RemoveAll(path)
}
return nil
}
func CopyFolder(source string, dest string) (err error) {
sourceinfo, err := os.Stat(source)
if err != nil {
return err
}
err = os.MkdirAll(dest, sourceinfo.Mode())
if err != nil {
return err
}
directory, _ := os.Open(source)
objects, err := directory.Readdir(-1)
for _, obj := range objects {
sourcefilepointer := source + "/" + obj.Name()
destinationfilepointer := dest + "/" + obj.Name()
if obj.IsDir() {
err = CopyFolder(sourcefilepointer, destinationfilepointer)
if err != nil {
log.Error("copy folder error: ", err.Error())
}
} else {
err = CopyFile(sourcefilepointer, destinationfilepointer)
if err != nil {
log.Error("copy file error: ", err.Error())
}
}
}
return
}
func CopyFile(source string, dest string) (err error) {
sourcefile, err := os.Open(source)
if err != nil {
return err
}
defer sourcefile.Close()
destfile, err := os.Create(dest)
if err != nil {
return err
}
defer destfile.Close()
_, err = io.Copy(destfile, sourcefile)
if err == nil {
sourceinfo, err := os.Stat(source)
if err == nil {
err = os.Chmod(dest, sourceinfo.Mode())
}
}
return
}
package path
import (
"fmt"
"github.com/spf13/viper"
"os"
)
func InitServiceDir(serviceID string) error {
return os.MkdirAll(fmt.Sprintf("%s/adl_core/log/%s", viper.GetString("app.mount_path"), serviceID), 0777)
}
func GetTaskLogFilePath(serviceID, taskID string) string {
return fmt.Sprintf("%s/adl_core/log/%s/%s.log", viper.GetString("app.mount_path"), serviceID, taskID)
}
package libs
import (
"encoding/json"
linuxproc "github.com/c9s/goprocinfo/linux"
"github.com/mindprince/gonvml"
"github.com/pkg/errors"
"runtime"
"time"
)
type ResourceSummary struct {
NodeName string `json:"node_name"`
CpuInfo *CpuInfo `json:"cpu_info"`
GpuInfo *GpuInfo `json:"gpu_info"`
MemInfo *MemInfo `json:"mem_info"`
UpdatedAt time.Time `json:"updated_at"`
}
func (rs *ResourceSummary) Parse(input string) error {
return json.Unmarshal([]byte(input), &rs)
}
func (rs *ResourceSummary) ToString() string {
tmp, _ := json.Marshal(rs)
return string(tmp)
}
type GpuInfo struct {
DriverVersion string `json:"driver_version"`
GPUS []GpuUnit `json:"gpus"`
}
type GpuUnit struct {
Index uint `json:"index"`
UUID string `json:"uuid"`
Name string `json:"name"`
MemoryUsed uint64 `json:"memory_used"`
MemoryTotal uint64 `json:"memory_total"`
GpuUtilization uint `json:"gpu_utilization"`
MemoryUtilization uint `json:"mem_utilization"`
PowerDraw uint `json:"power_draw"`
Temperature uint `json:"temperature"`
FanSpeed uint `json:"fan_speed"`
}
func GetGpuResourcesSummary() (gpuInfo *GpuInfo, err error) {
err = gonvml.Initialize()
if err != nil {
return
}
defer gonvml.Shutdown()
var driverVersion string
driverVersion, err = gonvml.SystemDriverVersion()
if err != nil {
err = errors.Wrap(err, "SystemDriverVersion() failed")
return
}
var numDevices uint
numDevices, err = gonvml.DeviceCount()
if err != nil {
err = errors.Wrap(err, "DeviceCount() failed")
return
}
gpuInfo = &GpuInfo{
DriverVersion: driverVersion,
GPUS: make([]GpuUnit, numDevices),
}
for i := 0; i < int(numDevices); i++ {
gpuRes := GpuUnit{}
var dev gonvml.Device
dev, err = gonvml.DeviceHandleByIndex(uint(i))
if err != nil {
err = errors.Wrap(err, "DeviceHandleByIndex() failed")
return
}
gpuRes.UUID, err = dev.UUID()
if err != nil {
err = errors.Wrap(err, "dev.UUID() failed")
return
}
gpuRes.Index, err = dev.MinorNumber()
if err != nil {
err = errors.Wrap(err, "dev.MinorNumber() failed")
return
}
gpuRes.Name, err = dev.Name()
if err != nil {
err = errors.Wrap(err, "dev.Name() failed")
return
}
gpuRes.MemoryTotal, gpuRes.MemoryUsed, err = dev.MemoryInfo()
if err != nil {
err = errors.Wrap(err, "dev.MemoryInfo() failed")
return
}
gpuRes.GpuUtilization, gpuRes.MemoryUtilization, err = dev.UtilizationRates()
if err != nil {
err = errors.Wrap(err, "dev.UtilizationRates() failed")
return
}
gpuRes.PowerDraw, err = dev.PowerUsage()
if err != nil {
err = errors.Wrap(err, "dev.PowerUsage() failed")
return
}
gpuRes.FanSpeed, err = dev.FanSpeed()
if err != nil {
err = errors.Wrap(err, "dev.FanSpeed() failed")
return
}
gpuRes.Temperature, err = dev.Temperature()
if err != nil {
err = errors.Wrap(err, "dev.Temperature() failed")
return
}
gpuInfo.GPUS[i] = gpuRes
}
return
}
type CpuInfo struct {
Count int `json:"count"`
linuxproc.LoadAvg
}
func GetCpuInfo() (cpuInfo *CpuInfo, err error) {
var loadInfo *linuxproc.LoadAvg
cpuInfo = &CpuInfo{Count: runtime.NumCPU()}
loadInfo, err = linuxproc.ReadLoadAvg("/proc/loadavg")
if err != nil {
return
}
cpuInfo.LoadAvg = *loadInfo
return
}
type MemInfo struct {
MemTotal uint64 `json:"mem_total"`
MemFree uint64 `json:"mem_free"`
MemAvailable uint64 `json:"mem_available"`
}
func GetMemInfo() (memInfo *MemInfo, err error) {
var readMem *linuxproc.MemInfo
memInfo = &MemInfo{}
readMem, err = linuxproc.ReadMemInfo("/proc/meminfo")
if err != nil {
return
}
memInfo.MemTotal = readMem.MemTotal
memInfo.MemFree = readMem.MemFree
memInfo.MemAvailable = readMem.MemAvailable
return
}
package libs
type TaskType string
const (
RunTask TaskType = "run"
StopTask TaskType = "stop"
)
package models
import (
"fmt"
log "github.com/cihub/seelog"
_ "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
"github.com/pkg/errors"
"github.com/spf13/viper"
"time"
)
var (
NoFilterSetError = errors.New("No filter set is forbidden in database")
)
func DBProvider() *gorm.DB {
var typ = viper.GetString("database.type")
var url = fmt.Sprintf(viper.GetString("database.url"), viper.GetString("database.user"), viper.GetString("database.password"), viper.GetString("database.host"), viper.GetString("database.name"))
var err error
log.Info("connect to database...", typ, url)
dbConn, err := gorm.Open(typ, url)
// defer dbConn.Close()
for err != nil {
log.Error("connect to db failed: ", err.Error())
time.Sleep(5 * time.Second)
dbConn, err = gorm.Open("mysql", url)
}
log.Info("connect to database success...")
dbConn.DB().SetMaxIdleConns(10)
dbConn.DB().SetConnMaxLifetime(time.Hour * 8)
dbConn.DB().SetMaxOpenConns(10)
dbConn.LogMode(true)
return dbConn
}
package models
import (
"errors"
"github.com/jinzhu/gorm"
"time"
)
const jobTableName = "job"
type Job struct {
ID int `gorm:"column:id;NOT NULL;PRIMARY KEY;" json:"id"`
ServiceID string `gorm:"column:service_id;NOT NULL;" json:"service_id"`
TaskID string `gorm:"column:task_id;NOT NULL;" json:"task_id"`
Definition []byte `gorm:"column:definition;NOT NULL;" json:"definition"`
CreatedAt time.Time `gorm:"column:created_at;NOT NULL;" json:"created_at"`
UpdatedAt time.Time `gorm:"column:updated_at;NOT NULL;" json:"updated_at"`
HasService bool `gorm:"column:has_service" json:"has_service"`
ServiceIsClean bool `gorm:"column:service_is_clean" json:"service_is_clean"`
LogIsPersist bool `gorm:"column:log_is_persist" json:"log_is_persist"`
LogIsFlushOver bool `gorm:"column:log_is_flush_over" json:"log_is_flush_over"`
}
func (j *Job) TableName() string {
return jobTableName
}
type JobModel struct{}
func JobModelProvider() *JobModel {
return &JobModel{}
}
func (model *JobModel) Create(dbConn *gorm.DB, job *Job) (err error) {
return dbConn.Create(job).Error
}
func (model *JobModel) Get(dbConn *gorm.DB, serviceID, taskID string) (job *Job, err error) {
if len(taskID) == 0 || len(serviceID) == 0 {
err = gorm.ErrRecordNotFound
return
}
job = &Job{}
err = dbConn.Where(&Job{ServiceID: serviceID, TaskID: taskID}).First(job).Error
return
}
func (model *JobModel) GetInTaskIDs(dbConn *gorm.DB, serviceID string, taskIDs []string) (job []Job, err error) {
if len(taskIDs) == 0 || len(serviceID) == 0 {
err = gorm.ErrRecordNotFound
return
}
err = dbConn.Where(&Job{ServiceID: serviceID}).Where(" task_id in (?)", taskIDs).Find(&job).Error
return
}
func (model *JobModel) CleanServiceFinished(dbConn *gorm.DB, serviceID, taskID string) (err error) {
if len(taskID) == 0 {
err = errors.New("can not update when no filter")
return
}
err = dbConn.Table(jobTableName).Where(&Job{ServiceID: serviceID, TaskID: taskID}).Updates(&Job{
ServiceIsClean: true,
}).Error
return
}
func (model *JobModel) UpdateDefinition(dbConn *gorm.DB, serviceID, taskID string, definition []byte) (err error) {
if len(taskID) == 0 {
err = errors.New("can not update when no filter")
return
}
err = dbConn.Table(jobTableName).Where(&Job{ServiceID: serviceID, TaskID: taskID}).Updates(&Job{
Definition: definition,
}).Error
return
}
func (model *JobModel) LogPersistFinished(dbConn *gorm.DB, serviceID, taskID string) (err error) {
if len(taskID) == 0 {
err = errors.New("can not update when no filter")
return
}
err = dbConn.Table(jobTableName).Where(&Job{ServiceID: serviceID, TaskID: taskID}).Updates(&Job{
LogIsPersist: true,
}).Error
return
}
func (model *JobModel) LogFlushOver(dbConn *gorm.DB, serviceID, taskID string) (err error) {
if len(taskID) == 0 {
err = errors.New("can not update when no filter")
return
}
err = dbConn.Table(jobTableName).Where(&Job{ServiceID: serviceID, TaskID: taskID}).Updates(&Job{
LogIsFlushOver: true,
}).Error
return
}
\ No newline at end of file
package models
import (
"github.com/jinzhu/gorm"
"time"
)
const serviceTableName = "service"
type Service struct {
ID int `gorm:"column:id;NOT NULL;PRIMARY KEY;" json:"id"`
ServiceID string `gorm:"column:service_id;NOT NULL;" json:"service_id"`
NotiAPI string `gorm:"column:noti_api;NOT NULL;" json:"noti_api"`
CreatedAt time.Time `gorm:"column:created_at;NOT NULL;" json:"created_at"`
}
func (j *Service) TableName() string {
return serviceTableName
}
type ServiceModel struct{}
func ServiceModelProvider() *ServiceModel {
return &ServiceModel{}
}
func (model *ServiceModel) Get(dbConn *gorm.DB, serviceID string) (service *Service, err error) {
if len(serviceID) == 0 {
err = gorm.ErrRecordNotFound
return
}
service = &Service{}
err = dbConn.Where(&Service{
ServiceID: serviceID,
}).First(service).Error
return
}
package models
import (
"autodl-core/libs"
"github.com/jinzhu/gorm"
"time"
)
const taskTableName = "task"
type Task struct {
ID int `gorm:"column:id;NOT NULL;PRIMARY KEY;" json:"-"`
ServiceID string `gorm:"column:service_id;NOT NULL;" json:"service_id"`
TaskType libs.TaskType `gorm:"column:task_type;" json:"task_type"`
TaskList string `gorm:"column:task_list;" json:"task_list"`
CreatedAt time.Time `gorm:"column:created_at;NOT NULL;" json:"created_at"`
}
func (ts *Task) TableName() string {
return taskTableName
}
type TaskModel struct{}
func TaskModelProvider() *TaskModel {
return &TaskModel{}
}
func (model *TaskModel) Create(dbConn *gorm.DB, task *Task) (id int, err error) {
err = dbConn.Create(task).Error
id = task.ID
return
}
func (model *TaskModel) GetLast(dbConn *gorm.DB, id int) (task *Task, err error) {
if id == 0 {
err = gorm.ErrRecordNotFound
return
}
task = &Task{}
err = dbConn.Where(&Task{
ID: id,
}).Last(task).Error
return
}
package models
import (
"autodl-core/libs"
log "github.com/cihub/seelog"
"github.com/jinzhu/gorm"
"gitlab.seetatech.com/autodl.com/autodl-base.git"
"strings"
"time"
)
const taskStatusTableName = "task_status"
type TaskStatus struct {
ID int `gorm:"column:id;NOT NULL;PRIMARY KEY;" json:"-"`
ServiceID string `gorm:"column:service_id;NOT NULL;" json:"service_id"`
TaskID string `gorm:"column:task_id;" json:"task_id"`
Status string `gorm:"column:status;" json:"status"`
Msg string `gorm:"column:msg;" json:"msg"`
IsNotifySuccess bool `gorm:"column:is_notify_success" json:"is_notify_success"`
CreatedAt time.Time `gorm:"column:created_at;NOT NULL;" json:"created_at"`
}
func (ts *TaskStatus) TableName() string {
return taskStatusTableName
}
type TaskStatusModel struct{}
func TaskStatusModelProvider() *TaskStatusModel {
return &TaskStatusModel{}
}
func (model *TaskStatusModel) create(dbConn *gorm.DB, taskStatus *TaskStatus) (err error) {
return dbConn.Create(taskStatus).Error
}
func (model *TaskStatusModel) GetLast(dbConn *gorm.DB, serviceID, taskID string) (taskStatus *TaskStatus, err error) {
if len(serviceID) == 0 || len(taskID) == 0 {
err = gorm.ErrRecordNotFound
return
}
taskStatus = &TaskStatus{}
err = dbConn.Where(&TaskStatus{
ServiceID: serviceID,
TaskID: taskID,
}).Last(taskStatus).Error
return
}
func (model *TaskStatusModel) getLast(dbConn *gorm.DB, filter *TaskStatus) (taskStatus *TaskStatus, err error) {
taskStatus = &TaskStatus{}
err = dbConn.Where(filter).Last(taskStatus).Error
return
}
func (model *TaskStatusModel) IsStatusNotifySuccess(dbConn *gorm.DB, serviceID, taskID, status string) bool {
ts, err := model.getLast(dbConn, &TaskStatus{ServiceID: serviceID, TaskID: taskID, Status: status})
if gorm.IsRecordNotFoundError(err) {
return false
}
return ts.IsNotifySuccess
}
func (model *TaskStatusModel) SetStatusNotifySuccess(dbConn *gorm.DB, serviceID, taskID, status string) error {
err := dbConn.Table(taskStatusTableName).Where(&TaskStatus{ServiceID: serviceID, TaskID: taskID, Status: status}).Updates(&TaskStatus{IsNotifySuccess: true}).Error
return err
}
func (model *TaskStatusModel) checkTaskStatusCanSet(dbConn *gorm.DB, serviceID, taskID, status string) error {
ts, err := model.getLast(dbConn, &TaskStatus{ServiceID: serviceID, TaskID: taskID})
if err == nil {
if autodl_base.TaskIsDone(ts.Status) {
return libs.TaskIsDoneCanNotSetStatusError
}
if ts.Status == status {
return libs.TaskStatusExistCanNotSetStatusError
}
}
_, err = model.getLast(dbConn, &TaskStatus{ServiceID: serviceID, TaskID: taskID, Status: status})
if !gorm.IsRecordNotFoundError(err) {
return libs.TaskStatusExistCanNotSetStatusError
}
return nil
}
func (model *TaskStatusModel) SetTaskStatus(dbConn *gorm.DB, serviceID, taskID, status, msg string) (err error) {
log.Info("set status: ", strings.Join([]string{serviceID, taskID, status, msg}, ", "))
err = model.checkTaskStatusCanSet(dbConn, serviceID, taskID, status)
if err != nil {
return
}
newTaskStatus := TaskStatus{
ServiceID: serviceID,
TaskID: taskID,
Status: status,
Msg: msg,
CreatedAt: time.Now(),
}
err = model.create(dbConn, &newTaskStatus)
return
}
package mongodb_plugin_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestMongodbPlugin(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "MongodbPlugin Suite")
}
package mongodb_plugin
import (
"autodl-core/libs"
"context"
log "github.com/cihub/seelog"
"github.com/pkg/errors"
"github.com/spf13/viper"
"gitlab.seetatech.com/autodl.com/autodl-base.git"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx"
"io"
"time"
)
type LogMongoClient struct {
mongoClient *mongo.Client
}
func MongoClientProvider() (mongoClient *LogMongoClient) {
mongoClient = &LogMongoClient{}
err := libs.RunnerFailedRetry(mongoClient.initMongoClient, 10)
if err != nil {
panic("panic!!!! failed to init mongodb: " + err.Error())
}
return
}
func (mc *LogMongoClient) initMongoClient() (err error) {
log.Info("init mongo client: ", viper.GetString("mongodb.addr"))
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
mc.mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(viper.GetString("mongodb.addr")), options.Client().SetMaxPoolSize(30))
if err != nil {
err = errors.Wrap(err, "init mongo db failed: "+err.Error())
log.Error(err.Error())
return
}
err = mc.mongoClient.Ping(ctx, readpref.Primary())
if err != nil {
err = errors.Wrap(err, "init mongo db failed: "+err.Error())
log.Error(err.Error())
return
}
err = mc.addIndex()
if err != nil {
err = errors.Wrap(err, "error when add index"+err.Error())
log.Error(err.Error())
}
return
}
type mongoIndex struct {
Key map[string]int
NS string
Name string
}
func (mc *LogMongoClient) addIndex() (err error) {
col := mc.mongoClient.Database(autodl_base.MongoDBCoreDatabaseName).Collection(autodl_base.MongoDBCoreLogCollectionName)
cursor, err := col.Indexes().List(context.Background())
if err != nil {
err = errors.Wrap(err, "get mongo index failed")
return
}
for cursor.Next(context.Background()) {
var idx mongoIndex
err = cursor.Decode(&idx)
if err != nil {
err = errors.Wrap(err, "decode mongo index failed")
continue
}
if _, ok := idx.Key["service_id"]; ok {
if _, ok := idx.Key["task_id"]; ok {
return
}
}
}
_, err = col.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{"service_id", bsonx.Int32(1)}, {"task_id", bsonx.Int32(1)}},
Options: options.Index().
SetBackground(true).
SetName("service_id-task_id").
SetVersion(1),
},
)
return
}
func (mc *LogMongoClient) AddLog(taskLogs ...autodl_base.TaskLog) (err error) {
err = mc.addLog(taskLogs...)
if err != nil {
mc.mongoClient = MongoClientProvider().mongoClient
err = mc.addLog(taskLogs...)
}
return
}
func (mc *LogMongoClient) addLog(taskLogs ...autodl_base.TaskLog) (err error) {
if len(taskLogs) == 0 {
return
}
col := mc.mongoClient.Database(autodl_base.MongoDBCoreDatabaseName).Collection(autodl_base.MongoDBCoreLogCollectionName)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
var content autodl_base.TaskLog
for index, taskLog := range taskLogs {
if index == 0 {
content = taskLog
} else {
content.Message += taskLog.Message
}
}
_, err = col.InsertOne(ctx, content)
return
}
func (mc *LogMongoClient) DeleteLog(serviceID, taskID string) (deleteCount int64, err error) {
col := mc.mongoClient.Database(autodl_base.MongoDBCoreDatabaseName).Collection(autodl_base.MongoDBCoreLogCollectionName)
query := bson.M{
"service_id": serviceID,
"task_id": taskID,
}
var deleteResult *mongo.DeleteResult
deleteResult, err = col.DeleteMany(context.Background(), query, &options.DeleteOptions{})
if err != nil {
return
}
deleteCount = deleteResult.DeletedCount
return
}
func (mc *LogMongoClient) GetAllLog(serviceID, taskID string, logChan chan<- autodl_base.TaskLogChanStruct) {
var err error
tlStruct := autodl_base.TaskLogChanStruct{}
defer func() {
if err == nil {
err = io.EOF
}
tlStruct.Error = err
logChan <- tlStruct
}()
query := bson.M{
"service_id": serviceID,
"task_id": taskID,
}
col := mc.mongoClient.Database(autodl_base.MongoDBCoreDatabaseName).Collection(autodl_base.MongoDBCoreLogCollectionName)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
var cur *mongo.Cursor
cur, err = col.Find(ctx, query, &options.FindOptions{})
if err != nil {
return
}
var taskLog autodl_base.TaskLog
for cur.Next(ctx) {
err = cur.Decode(&taskLog)
if err != nil {
err = errors.Wrap(err, "decode log failed")
return
}
tlStruct.TaskLog = taskLog
logChan <- tlStruct
}
if cur.Err() != nil {
err = cur.Err()
}
}
func (mc *LogMongoClient) GetPagedLogOrderByIDASC(serviceID, taskID, offsetID string, limit int64, logChan chan<- autodl_base.TaskLogChanStruct) {
var err error
tlStruct := autodl_base.TaskLogChanStruct{}
defer func() {
if err == nil {
err = io.EOF
}
tlStruct.Error = err
logChan <- tlStruct
}()
if len(taskID) == 0 {
return
}
if offsetID != "" && len(offsetID) < 10 {
err = errors.Errorf("wrong offsetID, only log offsetID is permitted, %s provided", offsetID)
return
}
col := mc.mongoClient.Database(autodl_base.MongoDBCoreDatabaseName).Collection(autodl_base.MongoDBCoreLogCollectionName)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
query := bson.M{
"service_id": serviceID,
"task_id": taskID,
}
if offsetID != "" {
var objectID primitive.ObjectID
objectID, err = primitive.ObjectIDFromHex(offsetID)
query["_id"] = bson.D{{"$gt", objectID}}
}
findOptions := &options.FindOptions{}
findOptions.SetSort(bsonx.Doc{{"_id", bsonx.Int32(1)}})
if limit > 0 {
if limit > 200 {
limit = 200
}
} else {
limit = 10
}
findOptions.SetLimit(limit)
var cur *mongo.Cursor
cur, err = col.Find(ctx, query, findOptions)
if err != nil {
err = errors.Wrap(err, "mongodb col find failed")
return
}
var taskLog autodl_base.TaskLog
for cur.Next(ctx) {
err = cur.Decode(&taskLog)
if err != nil {
err = errors.Wrap(err, "decode log failed")
return
}
tlStruct.TaskLog = taskLog
logChan <- tlStruct
}
if cur.Err() != nil {
err = cur.Err()
}
}
// offsetID == -1, return last 10 log
func (mc *LogMongoClient) GetPagedLogOrderByIDDESC(serviceID, taskID, offsetID string, limit int64) (logList []autodl_base.TaskLog, err error) {
if len(taskID) == 0 {
err = errors.Errorf("task id can not be nil")
return
}
if len(offsetID) < 10 && offsetID != "-1" {
err = errors.Errorf("wrong offsetID, only -1 or log offsetID is permitted, %s provided", offsetID)
return
}
col := mc.mongoClient.Database(autodl_base.MongoDBCoreDatabaseName).Collection(autodl_base.MongoDBCoreLogCollectionName)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
query := bson.M{
"service_id": serviceID,
"task_id": taskID,
}
if offsetID != "-1" {
var objectID primitive.ObjectID
objectID, err = primitive.ObjectIDFromHex(offsetID)
query["_id"] = bson.D{{"$lt", objectID}}
}
findOptions := &options.FindOptions{}
findOptions.SetSort(bsonx.Doc{{"_id", bsonx.Int32(-1)}})
if limit > 0 {
if limit > 200 {
limit = 200
}
findOptions.SetLimit(limit)
}
if offsetID == "-1" && limit == 0 {
findOptions.SetLimit(10)
}
var cur *mongo.Cursor
cur, err = col.Find(ctx, query, findOptions)
if err != nil {
errors.Wrap(err, "mongodb col find failed")
return
}
var taskLog autodl_base.TaskLog
for cur.Next(ctx) {
err = cur.Decode(&taskLog)
if err != nil {
err = errors.Wrap(err, "decode log failed")
return
}
logList = append(logList, taskLog)
}
if cur.Err() != nil {
err = cur.Err()
}
reverseLogList(logList)
return
}
func reverseLogList(s []autodl_base.TaskLog) {
for from, to := 0, len(s)-1; from < to; from, to = from+1, to-1 {
s[from], s[to] = s[to], s[from]
}
return
}
package mongodb_plugin_test
import (
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
autodl_base "gitlab.seetatech.com/autodl.com/autodl-base.git"
"strconv"
"time"
. "autodl-core/mongodb_plugin"
)
var _ = Describe("Options", func() {
var lm *LogMongoClient
var err error
BeforeEach(func() {
viper.Set("mongodb.addr", "mongodb://127.0.0.1:30011,127.0.0.1:30012,127.0.0.1:30013/seetaas?replicaSet=vision-set&authSource=admin")
lm = MongoClientProvider()
})
It("should pub", func() {
for i:=0; i < 10000; i ++ {
fmt.Println("add log of ", i)
err = lm.AddLog(autodl_base.TaskLog{
ServiceID: "service_id_a",
TaskID: strconv.Itoa(i),
Message: uuid.NewV4().String(),
CreatedAt: time.Time{},
})
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second)
}
})
})
Sending build context to Docker daemon 625.6MB
Step 1/6 : FROM hb.seetatech.com/k8s/ubuntu-basic:16.04
16.04: Pulling from k8s/ubuntu-basic
Digest: sha256:85104b6a9da33239971554534c75793db6d5ed0a95feaab9ef2725ca6f0e3632
Status: Downloaded newer image for hb.seetatech.com/k8s/ubuntu-basic:16.04
---> ffd9714fee87
Step 2/6 : COPY core--* /adl/bin/
COPY failed: no source files were specified
package redis_plugin
import (
"github.com/go-redis/redis"
)
type ResourcePlugin struct {
client *redis.Client
}
func ResourcePluginProvider(client *redis.Client) *ResourcePlugin {
return &ResourcePlugin{client: client}
}
func (hash *ResourcePlugin) tableName() string {
return "autodl_core_resource_hash_table"
}
func (hash *ResourcePlugin) Set(key, value string) (err error) {
err = hash.client.HSet(hash.tableName(), key, value).Err()
return
}
func (hash *ResourcePlugin) Get(key string) (content string, err error) {
content, err = hash.client.HGet(hash.tableName(), key).Result()
return
}
func (hash *ResourcePlugin) GetAll() (content map[string]string, err error) {
content, err = hash.client.HGetAll(hash.tableName()).Result()
return
}
package redis_plugin
import (
"github.com/go-redis/redis"
"gitlab.seetatech.com/autodl.com/autodl-base.git"
"strconv"
"time"
)
type TaskPlugin struct {
client *redis.Client
}
func TaskPluginProvider(client *redis.Client) *TaskPlugin {
return &TaskPlugin{client: client}
}
func (task *TaskPlugin) linkedListName() string {
return autodl_base.CoreWorkerLinkedList
}
func (task *TaskPlugin) AddBottom(taskUUID int) (err error) {
err = task.push(strconv.Itoa(taskUUID), false)
return
}
func (task *TaskPlugin) push(content string, leftPush bool) (err error) {
for i := 0; i < 5; i++ {
if leftPush {
err = task.client.LPush(task.linkedListName(), content).Err()
} else {
err = task.client.RPush(task.linkedListName(), content).Err()
}
if err != nil {
time.Sleep(time.Millisecond * 5)
continue
}
break
}
return
}
func (task *TaskPlugin) Pop() (taskUUID int, err error) {
var content string
content, err = task.client.LPop(task.linkedListName()).Result()
if err != nil {
return
}
taskUUID, err = strconv.Atoi(content)
return
}
func (task *TaskPlugin) AddTop(taskUUID int) (err error) {
err = task.push(strconv.Itoa(taskUUID), true)
return
}
package redis_plugin
import (
"encoding/json"
"fmt"
"github.com/go-redis/redis"
"gitlab.seetatech.com/autodl.com/autodl-base.git"
"strings"
"time"
)
type TaskStatusPlugin struct {
client *redis.Client
}
func TaskStatusPluginProvider(client *redis.Client) *TaskStatusPlugin {
return &TaskStatusPlugin{client: client}
}
func (task *TaskStatusPlugin) linkedListName(serviceID string) string {
return fmt.Sprintf("%s%s", autodl_base.CoreMonitorLinkedListPrefix, serviceID)
}
func (task *TaskStatusPlugin) GetServiceNameByTableName(tableName string) string {
return strings.TrimPrefix(tableName, autodl_base.CoreMonitorLinkedListPrefix)
}
func (task *TaskStatusPlugin) AddTaskStatusOfService(serviceID, taskID, taskStatus, msg string) (err error) {
var content []byte
val := autodl_base.TaskStatus{TaskIDString: taskID, TaskStatus: taskStatus, Msg: msg, UpdateAt: time.Now()}
content, err = json.Marshal(&val)
if err != nil {
return
}
for i := 0; i < 5; i++ {
err = task.client.RPush(task.linkedListName(serviceID), string(content)).Err()
if err != nil {
time.Sleep(time.Millisecond * 5)
continue
}
break
}
return
}
func (task *TaskStatusPlugin) GetTaskStatusLenOfService(serviceID string) (l int64, err error) {
l, err = task.client.LLen(task.linkedListName(serviceID)).Result()
return
}
func (task *TaskStatusPlugin) GetAllServiceTable() (tableList []string, err error) {
tableList, err = task.client.Keys(fmt.Sprintf("%s*", autodl_base.CoreMonitorLinkedListPrefix)).Result()
return
}
func (task *TaskStatusPlugin) PopTaskStatusOfService(serviceID string) (taskStatus *autodl_base.TaskStatus, err error) {
var content string
content, err = task.client.LPop(task.linkedListName(serviceID)).Result()
if err != nil {
return
}
taskStatus = &autodl_base.TaskStatus{}
err = json.Unmarshal([]byte(content), taskStatus)
return
}
func (task *TaskStatusPlugin) GetBackTaskStatusOfService(serviceID string, taskStatus *autodl_base.TaskStatus) (err error) {
if taskStatus == nil {
return
}
var content []byte
content, err = json.Marshal(&taskStatus)
if err != nil {
return
}
for i := 0; i < 5; i++ {
err = task.client.LPush(task.linkedListName(serviceID), string(content)).Err()
if err != nil {
time.Sleep(time.Millisecond * 5)
continue
}
break
}
return
}
package redis_plugin
import (
"github.com/go-redis/redis"
"github.com/spf13/viper"
)
func RedisClientProvider() *redis.Client {
return redis.NewClient(&redis.Options{
Addr: viper.GetString("redis.addr"),
Password: viper.GetString("redis.password"), // no password set IRisiI8m8JElsvJ7aChgWPcv1lwRGdTu
DB: 0, // use default DB
})
}
package service
import (
"autodl-core/k8s_plugin"
"autodl-core/libs"
"autodl-core/models"
"autodl-core/mongodb_plugin"
"autodl-core/redis_plugin"
"bufio"
"context"
"fmt"
log "github.com/cihub/seelog"
"github.com/jinzhu/gorm"
"github.com/spf13/viper"
"gitlab.seetatech.com/autodl.com/autodl-base.git"
"gitlab.seetatech.com/autodl.com/autodl-k8s.git/manager"
"gitlab.seetatech.com/autodl.com/autodl-k8s.git/template"
"io/ioutil"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"strings"
"time"
)
type Collector struct {
k8sManager *k8s_plugin.K8sManager
logMongoClient *mongodb_plugin.LogMongoClient
dbConn *gorm.DB
jobModel *models.JobModel
resourcePlugin *redis_plugin.ResourcePlugin
}
func (cs *Collector) CollectNodeResourceInfo(ctx context.Context) {
h, err := ioutil.ReadFile("/etc/autocnn_hostname")
if err != nil {
log.Error("/etc/autocnn_hostname file is not found")
return
}
hostname := strings.ToLower(strings.TrimSpace(string(h)))
resourceTicker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-resourceTicker.C:
gpuInfo, err := libs.GetGpuResourcesSummary()
if err != nil {
log.Info("get gpu info failed: ", err.Error())
gpuInfo = &libs.GpuInfo{}
}
cpuInfo, err := libs.GetCpuInfo()
if err != nil {
log.Info("get cpu info failed: ", err.Error())
cpuInfo = &libs.CpuInfo{}
}
memInfo, err := libs.GetMemInfo()
if err != nil {
log.Info("get mem info failed: ", err.Error())
memInfo = &libs.MemInfo{}
}
rs := libs.ResourceSummary{
NodeName: hostname,
CpuInfo: cpuInfo,
GpuInfo: gpuInfo,
MemInfo: memInfo,
UpdatedAt: time.Now(),
}
err = cs.resourcePlugin.Set(hostname, rs.ToString())
if err != nil {
log.Error("set resource info failed, ", err.Error())
}
}
}
}
func (cs *Collector) CollectTaskLog() {
clientset := cs.k8sManager.K8sManager.Clientset
startTime := metav1.NewTime(time.Now())
h, err := ioutil.ReadFile("/etc/autocnn_hostname")
if err != nil {
log.Error("/etc/autocnn_hostname file is not found")
return
}
hostname := strings.ToLower(strings.TrimSpace(string(h)))
fieldSelector, _ := fields.ParseSelector("spec.nodeName=" + hostname)
namespace := viper.GetString("k8s.namespace")
labelSelector := fmt.Sprintf("role = %s", template.GetWorkerRoleName(namespace))
listOption := metav1.ListOptions{TypeMeta: metav1.TypeMeta{Kind: manager.K8S_POD_KIND,
APIVersion: manager.K8S_API_VERSION_V1},
LabelSelector: labelSelector,
FieldSelector: fieldSelector.String()}
var podMap = make(map[string]bool)
podList, err := clientset.CoreV1().Pods(namespace).List(listOption)
if err != nil {
log.Error("list pod failed when initialize: ", err.Error())
return
}
for _, p := range podList.Items {
if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed || p.Status.Phase == v1.PodUnknown {
lenCond := len(p.Status.Conditions)
if lenCond == 0 {
log.Error("conditions length is zero")
continue
}
lastCond := p.Status.Conditions[lenCond-1]
if time.Now().After(lastCond.LastTransitionTime.Time.Add(time.Second * 60)) {
log.Info("history pod. skip ", p.Name)
continue
}
}
if _, ok := podMap[p.Name]; !ok {
go cs.watchLog(p.Name, p.Namespace, startTime)
podMap[p.Name] = true
}
}
for {
watcher, err := clientset.CoreV1().Pods(namespace).Watch(listOption)
if err != nil {
log.Error("watch pod for collect log failed: ", err.Error())
return
}
ch := watcher.ResultChan()
for event := range ch {
pod, ok := event.Object.(*v1.Pod)
if !ok {
log.Error("unexpected type")
continue
}
podStatus := pod.Status
phase := podStatus.Phase
if phase == v1.PodPending || phase == v1.PodRunning {
if _, ok := podMap[pod.Name]; !ok {
log.Info("start to collect log. pod name: ", pod.Name)
go cs.watchLog(pod.Name, pod.Namespace, startTime)
podMap[pod.Name] = true
}
}
}
}
}
func (cs *Collector) watchLog(podName, namespace string, startTime metav1.Time) {
log.Info("start watch job: ", podName)
var labels map[string]string
var isRunning = true
for {
pod, err := cs.k8sManager.K8sManager.Clientset.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})
if err != nil {
log.Error("Pod not found", err.Error())
return
}
status := pod.Status.Phase
if status == v1.PodRunning {
labels = pod.Labels
break
} else if status == v1.PodFailed || status == v1.PodSucceeded {
isRunning = false
labels = pod.Labels
break
} else {
log.Info("status in waiting: ", status)
}
time.Sleep(time.Second)
}
requests := cs.k8sManager.K8sManager.Clientset.CoreV1().
Pods(namespace).
GetLogs(podName, &v1.PodLogOptions{TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, Follow: isRunning, SinceTime: &startTime})
readCloser, err := requests.Stream()
if err != nil {
log.Error("maybe job have been over", err.Error())
return
}
defer readCloser.Close()
scanner := bufio.NewScanner(readCloser)
logCh := make(chan string, 10)
over := make(chan bool)
defer close(over)
go func() {
lastLogSyncTime := time.Now()
var logList []autodl_base.TaskLog
for {
select {
case logLine, ok := <-logCh:
if !ok {
log.Infof("log channel closed: len of log list %d", len(logList))
if len(logList) != 0 {
err = cs.logMongoClient.AddLog(logList...)
if err != nil {
log.Error("add log to mongo db failed, ", err.Error())
}
}
over <- true
return
}
logList = append(logList, autodl_base.TaskLog{
ServiceID: labels[libs.PodServiceIDLabel],
TaskID: labels[libs.PodTaskIDLabel],
CreatedAt: time.Now(),
Type: autodl_base.TaskLogTypeRuntime,
Message: logLine,
})
if len(logList) > 30 || time.Now().After(lastLogSyncTime.Add(time.Second)) {
log.Info("force add log")
err = cs.logMongoClient.AddLog(logList...)
if err != nil {
log.Error("add log to mongo db failed, ", err.Error())
}
lastLogSyncTime = time.Now()
logList = make([]autodl_base.TaskLog, 0)
}
}
}
}()
for scanner.Scan() {
logLine := scanner.Text()
if strings.HasPrefix(logLine, "Environment information not found") {
continue
}
if !strings.HasSuffix(logLine, "\n") {
logLine += "\n"
}
logCh <- logLine
}
log.Info("scan finished, close log channel")
close(logCh)
log.Info("wait over")
<-over
err = cs.jobModel.LogFlushOver(cs.dbConn, labels[libs.PodServiceIDLabel], labels[libs.PodTaskIDLabel])
if err != nil {
log.Errorf("set task log flush over failed: %s-%s, %v", labels[libs.PodServiceIDLabel], labels[libs.PodTaskIDLabel], err)
}
log.Info("job watch over: ", podName)
}
package service
import (
"autodl-core/k8s_plugin"
"bufio"
"fmt"
log "github.com/cihub/seelog"
"github.com/spf13/viper"
"gitlab.seetatech.com/autodl.com/autodl-k8s.git/manager"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"strings"
"time"
)
type LogStash struct {
k8sManager *k8s_plugin.K8sManager
}
type logContent struct {
PodName string
LogContent string
}
func (svc *LogStash) CollectTaskLog(namespace string) {
startTime := metav1.NewTime(time.Now())
listOption := metav1.ListOptions{TypeMeta: metav1.TypeMeta{Kind: manager.K8S_POD_KIND,
APIVersion: manager.K8S_API_VERSION_V1},
LabelSelector: "app"}
var podMap = make(map[string]bool)
podList, err := svc.k8sManager.K8sManager.Clientset.CoreV1().Pods(namespace).List(listOption)
if err != nil {
log.Error("list pod failed when initialize: ", err.Error())
return
}
logCh := make(chan logContent)
go svc.handleLog(logCh)
defer func() {
close(logCh)
}()
for _, p := range podList.Items {
if p.Status.Phase != v1.PodRunning {
continue
}
fmt.Println("found: ", p.Name)
if _, ok := podMap[p.Name]; !ok {
go svc.watchLog(p.Name, p.Namespace, startTime, logCh)
podMap[p.Name] = true
}
}
for {
watcher, err := svc.k8sManager.K8sManager.Clientset.CoreV1().Pods(namespace).Watch(listOption)
if err != nil {
log.Error("watch pod for collect log failed: ", err.Error())
return
}
ch := watcher.ResultChan()
for event := range ch {
pod, ok := event.Object.(*v1.Pod)
if !ok {
log.Error("unexpected type")
continue
}
podStatus := pod.Status
phase := podStatus.Phase
log.Info("monitor pod: ", pod.Name, " pod status: ", phase)
if phase == v1.PodPending || phase == v1.PodRunning {
if _, ok := podMap[pod.Name]; !ok {
log.Info("start to collect log. pod name: ", pod.Name)
go svc.watchLog(pod.Name, pod.Namespace, startTime, logCh)
podMap[pod.Name] = true
}
}
}
}
}
func (svc *LogStash) watchLog(podName, namespace string, startTime metav1.Time, logCh chan logContent) {
log.Info("start watch pod: ", podName)
for {
pod, err := svc.k8sManager.K8sManager.Clientset.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})
if err != nil {
log.Error("Pod not found", err.Error())
return
}
status := pod.Status.Phase
if status == v1.PodRunning {
break
} else if status == v1.PodFailed || status == v1.PodSucceeded {
return
} else {
log.Info("status in waiting: ", status)
}
time.Sleep(time.Second)
}
requests := svc.k8sManager.K8sManager.Clientset.CoreV1().
Pods(namespace).
GetLogs(podName, &v1.PodLogOptions{TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, Follow: true, SinceTime: &startTime})
readCloser, err := requests.Stream()
if err != nil {
log.Error("maybe job have been over", err.Error())
return
}
defer readCloser.Close()
scanner := bufio.NewScanner(readCloser)
var lastLine string
for scanner.Scan() {
logLine := scanner.Text()
if containErrInfo(&logLine) {
logCh <- logContent{
PodName: podName,
LogContent: lastLine + "\n" + logLine + "\n",
}
}
lastLine = logLine
}
log.Info("pod watch over: ", podName)
}
func containErrInfo(line *string) bool {
return strings.Contains(*line, "ERR") || strings.Contains(*line, "err") || strings.Contains(*line, "panic") || strings.Contains(*line, "fail") || strings.Contains(*line, "goroutine")
}
func getLogFinePath(podName string) string {
today := time.Now()
return viper.GetString("app.mount_path") + "/log_collector/" + fmt.Sprintf("%d_%d_%d__%s", today.Year(), today.Month(), today.Day(), podName) + ".log"
}
func (svc *LogStash) handleLog(logCh chan logContent) {
var f *os.File
var err error
defer func() {
if f != nil {
f.Close()
}
}()
tick := time.Tick(time.Second * 5)
lastFlushTime := time.Now()
var record []logContent
for {
select {
case lcontent, ok := <-logCh:
if !ok {
log.Info("close log transfer channel")
return
}
record = append(record, lcontent)
case <-tick:
if len(record) == 0 {
continue
}
if len(record) < 100 && time.Since(lastFlushTime) < 20*time.Second {
continue
}
f, err = os.OpenFile(getLogFinePath(record[0].PodName), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
log.Error("can not open/write log to log file! ", err.Error())
continue
}
for _, content := range record {
_, err = f.Write([]byte(content.LogContent))
if err != nil {
log.Error(err.Error())
log.Error("can not write log, print err: ", content)
}
}
f.Close()
record = record[:0]
}
}
}
package service
import (
"autodl-core/k8s_plugin"
"autodl-core/libs"
"autodl-core/libs/path"
"autodl-core/models"
"autodl-core/mongodb_plugin"
"autodl-core/redis_plugin"
"context"
"fmt"
log "github.com/cihub/seelog"
"github.com/go-redis/redis"
"github.com/jinzhu/gorm"
"github.com/levigross/grequests"
"github.com/pkg/errors"
"github.com/spf13/viper"
"gitlab.seetatech.com/autodl.com/autodl-base.git"
"gitlab.seetatech.com/autodl.com/autodl-k8s.git/spawner"
"gitlab.seetatech.com/autodl.com/autodl-k8s.git/template"
"io"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"os"
"time"
)
type Monitor struct {
k8sManager *k8s_plugin.K8sManager
taskStatusModel *models.TaskStatusModel
dbConn *gorm.DB
jobModel *models.JobModel
taskStatusPlugin *redis_plugin.TaskStatusPlugin
serviceModel *models.ServiceModel
logMongoPlugin *mongodb_plugin.LogMongoClient
*Status
}
func (ms *Monitor) WatchTaskStatus(ctx context.Context, stop chan<- error) {
var err error
defer func() {
stop <- err
}()
namespace := viper.GetString("k8s.namespace")
labelSelector := fmt.Sprintf("role = %s", template.GetWorkerRoleName(namespace))
for {
select {
case <-ctx.Done():
return
default:
watcher, err := ms.k8sManager.K8sManager.Clientset.CoreV1().Pods(namespace).Watch(metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
log.Error("monitor status watcher failed: ", err.Error())
}
ch := watcher.ResultChan()
for event := range ch {
pod, ok := event.Object.(*v1.Pod)
if !ok {
log.Error("unexpected type")
continue
}
taskStatus, serviceID, err := ms.getTaskStatus(pod)
if err != nil {
continue
}
err = ms.UpdateTaskStatus(serviceID, taskStatus.TaskIDString, taskStatus.TaskStatus, taskStatus.Msg)
if err != nil && err != libs.TaskIsDoneCanNotSetStatusError && err != libs.TaskStatusExistCanNotSetStatusError {
err = errors.Wrap(err, "push task status to redis linked list failed")
return
}
}
}
}
}
func (ms *Monitor) getTaskStatus(pod *v1.Pod) (ts *autodl_base.TaskStatus, serviceID string, err error) {
ts = &autodl_base.TaskStatus{}
if len(pod.Labels[libs.PodTaskIDLabel]) > 0 && len(pod.Labels[libs.PodServiceIDLabel]) > 0 {
ts.TaskIDString = pod.Labels[libs.PodTaskIDLabel]
ts.TaskStatus = string(pod.Status.Phase)
ts.Msg = pod.Status.Message
if pod.Status.Phase == v1.PodFailed {
if len(pod.Status.ContainerStatuses) > 0 {
ts.Msg += pod.Status.ContainerStatuses[0].State.Terminated.Reason + " " + pod.Status.ContainerStatuses[0].State.Terminated.Message
}
}
ts.UpdateAt = time.Now()
serviceID = pod.Labels[libs.PodServiceIDLabel]
return
}
err = errors.New("Label selector does not work, we got a pod without podtaskIDLabel || PodServiceIDLabel, " + pod.String())
return
}
func (ms *Monitor) HandleTaskStatus(ctx context.Context, stop chan<- error) {
var err error
defer func() {
stop <- err
}()
sonCtx, cancelFunc := context.WithCancel(context.Background())
monitorServerHub := make(map[string]bool)
for {
select {
case <-ctx.Done():
cancelFunc()
return
default:
var tableList []string
tableList, err = ms.taskStatusPlugin.GetAllServiceTable()
if err != nil {
if err == redis.Nil {
continue
}
cancelFunc()
err = errors.Wrap(err, "get all service table failed")
return
}
for _, serviceTableName := range tableList {
if _, ok := monitorServerHub[serviceTableName]; !ok {
go ms.notifyService(sonCtx, ms.taskStatusPlugin.GetServiceNameByTableName(serviceTableName))
monitorServerHub[serviceTableName] = true
}
}
time.Sleep(time.Second * 10)
}
}
}
func (ms *Monitor) notifyService(ctx context.Context, serviceID string) {
log.Info("start send notify of ", serviceID)
var failedNotiTimes int
for {
select {
case <-ctx.Done():
return
default:
taskStatus, err := ms.taskStatusPlugin.PopTaskStatusOfService(serviceID)
if err != nil {
if err != redis.Nil {
log.Info("error when pop: ", err.Error())
}
time.Sleep(time.Second * 2)
continue
}
lastTaskStatus, _ := ms.taskStatusModel.GetLast(ms.dbConn, serviceID, taskStatus.TaskIDString)
if lastTaskStatus != nil {
if lastTaskStatus.Status == autodl_base.TaskStopped {
var spa *spawner.ExpSpawner
spa, err = spawner.NewExpStopSpawner(serviceID, taskStatus.TaskIDString, ms.k8sManager.K8sManager)
err = spa.StopExp()
if err != nil {
log.Error("delete exp master failed: ", err.Error(), " ", serviceID, " ", taskStatus.TaskIDString)
}
}
}
// handle log persist and svc clean
if autodl_base.TaskIsDone(taskStatus.TaskStatus) {
task, err := ms.jobModel.Get(ms.dbConn, serviceID, taskStatus.TaskIDString)
if err != nil {
log.Error("Find task by ID fail, ", err.Error(), task.TaskID)
continue
}
if !task.LogIsPersist {
go ms.persistLog(serviceID, task.TaskID)
}
if task.HasService && !task.ServiceIsClean {
spa, err := spawner.NewExpStopSpawner(serviceID, task.TaskID, ms.k8sManager.K8sManager)
if err != nil {
log.Error("delete svc fail, init Spawner fail: ", err.Error(), task.TaskID)
} else {
err = spa.Manager.DeleteService(template.GetServiceName(serviceID, task.TaskID))
if err != nil {
log.Error("delete svc fail, delete fail: ", err.Error(), task.TaskID)
}
err = ms.jobModel.CleanServiceFinished(ms.dbConn, serviceID, task.TaskID)
if err != nil {
log.Error("update service is clean failed: ", err.Error())
}
}
}
}
//notify to service api
if !ms.taskStatusModel.IsStatusNotifySuccess(ms.dbConn, serviceID, taskStatus.TaskIDString, taskStatus.TaskStatus) {
service, err := ms.serviceModel.Get(ms.dbConn, serviceID)
if err != nil {
log.Error("handle task status of service failed (get service): ", serviceID)
continue
}
res, err := grequests.Post(service.NotiAPI, &grequests.RequestOptions{
JSON: taskStatus,
RequestTimeout: time.Second * 10,
})
if err != nil || res.StatusCode != http.StatusOK {
log.Info("send failed:", err, ", status code: ", res.StatusCode)
err := ms.taskStatusPlugin.GetBackTaskStatusOfService(serviceID, taskStatus)
if err != nil {
log.Error("!! get back task status to list failed: ", err.Error())
}
failedNotiTimes++
libs.Delay(failedNotiTimes)
continue
} else {
failedNotiTimes = 0
}
log.Info("send status to notify api success")
err = ms.taskStatusModel.SetStatusNotifySuccess(ms.dbConn, serviceID, taskStatus.TaskIDString, taskStatus.TaskStatus)
if err != nil {
log.Error("set notify success failed: ", err)
}
}
}
}
}
func (ms *Monitor) persistLog(serviceID, taskID string) (err error) {
defer func() {
if err != nil {
log.Error("!! log persist failed: ", err.Error())
}
}()
for i := 1; i < 5; i++ {
var job *models.Job
job, err = ms.jobModel.Get(ms.dbConn, serviceID, taskID)
if err != nil {
log.Errorf("get job from db failed: %v", err)
break
}
if job.LogIsFlushOver {
break
} else {
time.Sleep(time.Second)
}
}
var f *os.File
err = path.InitServiceDir(serviceID)
if err != nil {
err = errors.Wrapf(err, "init service [%s] dir failed", serviceID)
return
}
// if monitor have multi deployment, use this to avoid persist at same time
//_, err = os.Stat(path.GetTaskLogFilePath(serviceID, taskID))
//if !os.IsNotExist(err) {
// return nil
//}
f, err = os.OpenFile(path.GetTaskLogFilePath(serviceID, taskID), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
err = errors.Wrapf(err, "open log file [%s] failed", path.GetTaskLogFilePath(serviceID, taskID))
return
}
defer f.Close()
logChan := make(chan autodl_base.TaskLogChanStruct)
defer close(logChan)
go ms.logMongoPlugin.GetAllLog(serviceID, taskID, logChan)
for {
select {
case taskLog, ok := <-logChan:
if !ok {
err = errors.Errorf("get log failed: %v", ok)
return
}
if taskLog.Error != nil {
if taskLog.Error == io.EOF {
err = ms.jobModel.LogPersistFinished(ms.dbConn, serviceID, taskID)
if err != nil {
log.Errorf("set log persist finished failed: %v", err)
}
var deleteCount int64
deleteCount, err = ms.logMongoPlugin.DeleteLog(serviceID, taskID)
log.Infof("try to delete log of %s-%s [Count: %d]", serviceID, taskID, deleteCount)
if err != nil {
log.Error("failed to delete log of ", serviceID, " ", taskID, err.Error())
}
} else {
err = errors.Errorf("get log failed 2: %v", taskLog.Error)
}
return
}
_, err = f.WriteString(taskLog.Message)
if err != nil {
return
}
}
}
}
package service
import (
"autodl-core/libs"
"autodl-core/models"
"autodl-core/redis_plugin"
log "github.com/cihub/seelog"
"github.com/jinzhu/gorm"
)
type Status struct {
taskStatusModel *models.TaskStatusModel
dbConn *gorm.DB
taskStatusPlugin *redis_plugin.TaskStatusPlugin
}
func (svc *Status) UpdateTaskStatus(serviceID, taskID, status, msg string) (err error) {
err = svc.taskStatusModel.SetTaskStatus(svc.dbConn, serviceID, taskID, status, msg)
switch err {
case nil:
case libs.TaskStatusExistCanNotSetStatusError:
return
case libs.TaskIsDoneCanNotSetStatusError:
return
default:
log.Error("set task status in model failed: ", err)
return
}
err = svc.taskStatusPlugin.AddTaskStatusOfService(serviceID, taskID, status, msg)
return
}
package utils
import (
"autodl-core/mongodb_plugin"
"bytes"
"encoding/json"
"errors"
"fmt"
"gitlab.seetatech.com/autodl.com/autodl-base.git"
"io"
"io/ioutil"
"strings"
"time"
)
type decoderWrapper struct {
buffer *bytes.Buffer
lines map[string]int
diff int
taskID string
serviceID string
client *mongodb_plugin.LogMongoClient
}
type logStreamMessage struct {
Stream string `json:"stream,omitempty"`
Status string `json:"status,omitempty"`
Progress string `json:"progress,omitempty"`
ID string `json:"id,omitempty"`
Error string `json:"error,omitempty"`
}
func DockerLogDecoderWrapper(serviceID, taskID string, client *mongodb_plugin.LogMongoClient) io.WriteCloser {
buffer := &bytes.Buffer{}
lines := make(map[string]int)
diff := 0
return &decoderWrapper{
buffer: buffer,
lines: lines,
diff: diff,
serviceID: serviceID,
taskID: taskID,
client: client,
}
}
func (w *decoderWrapper) Close() error {
w.client.AddLog(autodl_base.TaskLog{
ServiceID: w.serviceID,
TaskID: w.taskID,
Message: "======== your experiment building over. wait to scheduler to machine and running ========\n",
CreatedAt: time.Now(),
Type: autodl_base.TaskLogTypeBuilding,
})
return nil
}
func (w *decoderWrapper) Write(p []byte) (int, error) {
//defer w.Flush()
n, err := w.buffer.Write(p)
if err != nil {
return n, err
}
return n, w.Decode()
}
//func (w decoderWrapper) Flush() {
//
//}
func (w *decoderWrapper) encode(m logStreamMessage) {
var endl string
if m.Stream == "" && (m.Progress != "" || m.Status != "") {
endl = "\r"
}
var line string
if m.ID != "" {
line += fmt.Sprintf("%s: ", m.ID)
}
if m.Progress != "" {
line += fmt.Sprintf("%s %s%s", m.Status, m.Progress, endl)
} else if m.Stream != "" {
line += fmt.Sprintf("%s%s", m.Stream, endl)
} else {
line += fmt.Sprintf("%s%s", m.Status, endl)
}
if !strings.HasSuffix(line, "\n") {
line += "\n"
}
w.client.AddLog(autodl_base.TaskLog{
ServiceID: w.serviceID,
TaskID: w.taskID,
Message: line,
CreatedAt: time.Now(),
Type: autodl_base.TaskLogTypeBuilding,
})
}
func (w *decoderWrapper) setCursor(id string) {
if id != "" {
line, ok := w.lines[id]
if !ok {
line = len(w.lines)
w.lines[id] = line
fmt.Print("\n")
} else {
w.diff = len(w.lines) - line
}
} else {
w.lines = make(map[string]int)
}
}
func (w *decoderWrapper) Decode() error {
decoder := json.NewDecoder(w.buffer)
for {
m := logStreamMessage{}
if err := decoder.Decode(&m); err != nil {
if err != io.EOF {
return err
}
// Recopy remaining bytes into buffer to be available again for json decoder.
b, err := ioutil.ReadAll(decoder.Buffered())
if err != nil {
return err
}
w.buffer = bytes.NewBuffer(b)
return nil
}
if m.Error != "" {
return errors.New(m.Error)
}
w.setCursor(m.ID)
w.encode(m)
}
}
package utils
import (
"fmt"
"gitlab.seetatech.com/autodl.com/autodl-k8s.git/template"
)
func GetJupyterHost(host string) (jupyterHost string) {
jupyterHost = fmt.Sprintf("%s/jupyter/", host)
return
}
func GetTensorboardHost(host string) (jupyterHost string) {
jupyterHost = fmt.Sprintf("%s/tensorboard/", host, )
return
}
func GetStandardHost(host string) (jupyterHost string) {
jupyterHost = fmt.Sprintf("%s/standard/", host, )
return
}
func GetServiceName(namespace, serviceID, taskID string) string {
return fmt.Sprintf("%s.%s.svc.cluster.local", template.GetServiceName(serviceID, taskID), namespace)
}
package utils
import (
"autodl-core/git_repo"
"autodl-core/libs/docker"
"autodl-core/libs/docker/render"
"autodl-core/libs/path"
"autodl-core/mongodb_plugin"
"bytes"
"encoding/json"
"fmt"
log "github.com/cihub/seelog"
"github.com/gogs/git-module"
"github.com/pkg/errors"
"github.com/spf13/viper"
"gitlab.seetatech.com/autodl.com/autodl-base.git/protoV1"
"io/ioutil"
"os"
"strings"
)
type Task struct {
proto_v1.Task
}
func NewTask(task proto_v1.Task) *Task {
return &Task{
task,
}
}
func (t *Task) tag() string {
return "latest"
}
func (t *Task) imageName() string {
return fmt.Sprintf("%s/%s/%s", viper.GetString("docker.registry_host"), viper.GetString("docker.registry_group_name"), t.Task.TaskId)
}
func (t *Task) ImageNameTag() string {
return t.imageName() + ":" + t.tag()
}
func (t *Task) CodeAbsPath() string {
return fmt.Sprintf("%s/%s", viper.GetString("app.mount_path"), strings.TrimLeft(t.Task.BuildInfo.CodePath, "/"))
}
/*
├── /tmp/t.ID(tempPath)
│ ├── Dockerfile
│ └── t.ID(codePath)
│ ├── code.*
│ ├── code.content
*/
func (t *Task) tempPath() string {
return fmt.Sprintf("/tmp/%s", t.Task.TaskId)
}
func (t *Task) codePath() string {
return fmt.Sprintf("/tmp/%s/%s", t.Task.TaskId, t.Task.TaskId)
}
func (t *Task) dockerfile(copyCode bool) *bytes.Buffer {
//env := make(map[string]string)
//for _, v := range t.Task.BuildInfo.Envs {
// env[v.Key] = v.Value
//}
var buf bytes.Buffer
render.WriteRenderDockerfile(&buf, t.Task.BuildInfo.FromImage,
t.Task.TaskId,
"/code",
copyCode,
t.Task.BuildInfo.Runs,
nil,
t.Task.BuildInfo.CopyAfterRun)
return &buf
}
func (t *Task) ToString() string {
b, err := json.Marshal(t)
if err != nil {
log.Error("Task marshal fail: ", err.Error())
}
return string(b)
}
func (t *Task) Validate() (err error) {
var errMsg string
if len(t.Task.TaskId) == 0 {
errMsg += " missing " + "[Task id]"
}
if t.Task.PodInfo == nil {
errMsg += " missing " + "[pod info]"
} else {
if t.Task.PodInfo.Resource == nil {
errMsg += " missing " + "[resource]"
} else {
if t.Task.PodInfo.Resource.Cpu == nil || t.Task.PodInfo.Resource.Gpu == nil || t.Task.PodInfo.Resource.Memory == nil {
errMsg += " missing " + "[resource.Cpu | resource.Gpu | resource.Memory]"
}
}
}
if t.Task.BuildInfo == nil {
errMsg += " missing " + "[build info]"
} else {
if len(t.Task.BuildInfo.FromImage) == 0 {
errMsg += " missing " + "[from image]"
}
if len(t.Task.BuildInfo.Cmd) == 0 {
errMsg += " missing " + "[cmd]"
}
if t.Task.BuildInfo.HaveCode {
switch t.Task.BuildInfo.CodeType {
case proto_v1.TaskBuildInfo_Normal, proto_v1.TaskBuildInfo_GitRepo, proto_v1.TaskBuildInfo_GitBareRepo:
if len(t.Task.BuildInfo.CodePath) == 0 {
errMsg += " missing " + "[code path]"
}
default:
errMsg += " missing " + "[code type]"
}
}
}
if len(errMsg) > 0 {
err = errors.Errorf("Parameter missing: %s", errMsg)
}
return
}
func (t *Task) BuildAndPush(serviceID string, logPlugin *mongodb_plugin.LogMongoClient) (err error) {
decoder := DockerLogDecoderWrapper(serviceID, t.Task.TaskId, logPlugin)
logStream := docker.LogStream{Decoder: decoder}
endpoint := "unix:///var/run/docker.sock"
var do docker.Docker
do, err = docker.New(endpoint)
if err != nil {
log.Error("docker new error")
return
}
path.DeletePath(t.codePath())
os.MkdirAll(t.tempPath(), 0644)
//defer path.DeletePath(t.tempPath())
var copyCode bool
if t.Task.BuildInfo.HaveCode {
switch t.Task.BuildInfo.CodeType {
case proto_v1.TaskBuildInfo_Normal:
err = path.CopyFolder(t.CodeAbsPath(), t.codePath())
if err != nil {
err = errors.Wrap(err, "copy code to tmp code path failed")
return
}
copyCode = true
case proto_v1.TaskBuildInfo_GitRepo:
err = path.CopyFolder(t.CodeAbsPath(), t.codePath())
if err != nil {
err = errors.Wrap(err, "copy code to tmp code path failed")
return
}
path.DeletePath(fmt.Sprintf("%s/.git", t.codePath()))
copyCode = true
case proto_v1.TaskBuildInfo_GitBareRepo:
err = git.Clone(t.CodeAbsPath(), t.codePath(), git.CloneRepoOptions{Branch: t.Task.BuildInfo.CodeBranch})
if err != nil {
err = fmt.Errorf("copy repository to tmp path error: %s", err.Error())
return
}
if len(t.Task.BuildInfo.CodeCommit) != 0 {
var repoHub *git_repo.Repo
repoHub, err = git_repo.Open(t.codePath())
if err != nil {
return
}
err = repoHub.Checkout(t.Task.BuildInfo.CodeCommit)
if err != nil {
return
}
path.DeletePath(fmt.Sprintf("%s/.git", t.codePath()))
}
copyCode = true
default:
}
}
err = ioutil.WriteFile(t.tempPath()+"/Dockerfile", t.dockerfile(copyCode).Bytes(), 0644)
if err != nil {
return
}
err = do.Build(docker.BuildOptions{
Name: t.imageName(),
Directory: t.tempPath(),
Pull: true,
NoCache: false,
}, logStream)
if err != nil {
log.Error("docker build error")
return
}
err = do.Push(docker.PushOptions{
Repository: t.imageName(),
Registry: viper.GetString("docker.registry_host"),
Tag: t.tag(),
}, logStream)
decoder.Close()
return
}
//+build wireinject
package service
import (
"autodl-core/k8s_plugin"
"autodl-core/models"
"autodl-core/mongodb_plugin"
"autodl-core/redis_plugin"
"github.com/google/wire"
"github.com/jinzhu/gorm"
)
func collectorProvider(k8sManager *k8s_plugin.K8sManager, logMongoClient *mongodb_plugin.LogMongoClient, dbConn *gorm.DB, jobModel *models.JobModel, plugin *redis_plugin.ResourcePlugin, ) *Collector {
return &Collector{k8sManager, logMongoClient, dbConn, jobModel, plugin}
}
func monitorProvider(k8sManager *k8s_plugin.K8sManager, taskStatusModel *models.TaskStatusModel, dbConn *gorm.DB, jobModel *models.JobModel, taskStatusPlugin *redis_plugin.TaskStatusPlugin, serviceModel *models.ServiceModel, logMongoPlugin *mongodb_plugin.LogMongoClient, cs *Status) *Monitor {
return &Monitor{k8sManager, taskStatusModel, dbConn, jobModel, taskStatusPlugin, serviceModel, logMongoPlugin, cs}
}
func rpcProvider(
dbConn *gorm.DB,
taskStatusPlugin *redis_plugin.TaskStatusPlugin,
taskStatusModel *models.TaskStatusModel,
jobModel *models.JobModel,
logPlugin *mongodb_plugin.LogMongoClient,
serviceModel *models.ServiceModel,
taskPlugin *redis_plugin.TaskPlugin,
taskModel *models.TaskModel,
plugin *redis_plugin.ResourcePlugin,
k8sManager *k8s_plugin.K8sManager) *Rpc {
return &Rpc{dbConn, taskStatusPlugin, taskStatusModel, jobModel, logPlugin, k8sManager, serviceModel, taskPlugin, taskModel, plugin}
}
func statusProvider(taskStatusModel *models.TaskStatusModel, dbConn *gorm.DB, taskStatusPlugin *redis_plugin.TaskStatusPlugin) *Status {
return &Status{taskStatusModel, dbConn, taskStatusPlugin}
}
func workerProvider(
dbConn *gorm.DB,
taskStatusPlugin *redis_plugin.TaskStatusPlugin,
jobModel *models.JobModel,
logPlugin *mongodb_plugin.LogMongoClient,
taskPlugin *redis_plugin.TaskPlugin,
cs *Status,
taskModel *models.TaskModel,
k8sManager *k8s_plugin.K8sManager) *Worker {
return &Worker{dbConn, taskStatusPlugin, jobModel, logPlugin, k8sManager, taskPlugin, taskModel, cs}
}
func logStashProvider(k8sManager *k8s_plugin.K8sManager) *LogStash {
return &LogStash{k8sManager}
}
type ServerFactory struct {
RpcServer *Rpc
}
var serverSet = wire.NewSet(
rpcProvider,
models.DBProvider,
models.TaskStatusModelProvider,
models.JobModelProvider,
models.ServiceModelProvider,
models.TaskModelProvider,
redis_plugin.TaskStatusPluginProvider,
redis_plugin.RedisClientProvider,
redis_plugin.TaskPluginProvider,
redis_plugin.ResourcePluginProvider,
mongodb_plugin.MongoClientProvider,
k8s_plugin.K8sManagerProvider,
wire.Struct(new(ServerFactory), "*"),
)
func InitializeServerFactory() ServerFactory {
wire.Build(serverSet)
return ServerFactory{}
}
type WorkerFactory struct {
Worker *Worker
}
var workerSet = wire.NewSet(
workerProvider,
models.DBProvider,
models.TaskStatusModelProvider,
models.JobModelProvider,
models.ServiceModelProvider,
models.TaskModelProvider,
redis_plugin.TaskStatusPluginProvider,
redis_plugin.RedisClientProvider,
redis_plugin.TaskPluginProvider,
mongodb_plugin.MongoClientProvider,
k8s_plugin.K8sManagerProvider,
statusProvider,
wire.Struct(new(WorkerFactory), "*"),
)
func InitializeWorkerFactory() WorkerFactory {
wire.Build(workerSet)
return WorkerFactory{}
}
type CollectorFactory struct {
Collector *Collector
}
var collectorSet = wire.NewSet(
collectorProvider,
mongodb_plugin.MongoClientProvider,
models.DBProvider,
models.JobModelProvider,
k8s_plugin.K8sManagerProvider,
redis_plugin.RedisClientProvider,
redis_plugin.ResourcePluginProvider,
wire.Struct(new(CollectorFactory), "*"),
)
func InitializeCollectorFactory() CollectorFactory {
wire.Build(collectorSet)
return CollectorFactory{}
}
type MonitorFactory struct {
Monitor *Monitor
}
var monitorSet = wire.NewSet(
monitorProvider,
models.DBProvider,
models.TaskStatusModelProvider,
models.JobModelProvider,
models.ServiceModelProvider,
redis_plugin.RedisClientProvider,
redis_plugin.TaskStatusPluginProvider,
mongodb_plugin.MongoClientProvider,
k8s_plugin.K8sManagerProvider,
statusProvider,
wire.Struct(new(MonitorFactory), "*"),
)
func InitializeMonitorFactory() MonitorFactory {
wire.Build(monitorSet)
return MonitorFactory{}
}
type LogStashFactory struct {
LogStash *LogStash
}
var logStashSet = wire.NewSet(
logStashProvider,
k8s_plugin.K8sManagerProvider,
wire.Struct(new(LogStashFactory), "*"),
)
func InitializeLogStashFactory() LogStashFactory {
wire.Build(logStashSet)
return LogStashFactory{}
}
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!