Commit 14bebfb0 by 杨艳磊

kvm_server init

0 parents
Pipeline #24195 failed
in 0 seconds
Showing with 4854 additions and 0 deletions
kvm_agent
kvm_agent*
/gpu*
gpu-*
gs
ga*
nfs_monitor
cmd/toolkit/toolkit
.idea/
.vscode/
.history/
.gradle/
/test
frpc
autopanel
frps
supervisord
_output
coverage.data
pkg/libs/captcha/*.png
.DS_Store
\ No newline at end of file
# 运行测试用例
test:
stage: test
# only:
# - develop
# - master
script:
- make PATH=$PATH:/usr/local/go/bin
\ No newline at end of file
The MIT License (MIT)
Copyright © 2021 seetaas
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# kvm_agent_server
[![build status](https://gitlab-gpuhub.autodl.com/poyekhali/server/badges/master/build.svg)](https://gitlab.seetatech.com/poyekhali/server/commits/master)
[![coverage report](https://gitlab-gpuhub.autodl.com/poyekhali/server/badges/master/coverage.svg)](https://gitlab.seetatech.com/poyekhali/server/commits/master)
## What is kvm_agent_server
管理kvm虚拟机客户端
## What is ga
ga是 gpu agent缩写, 是运行在共享硬件的物理主机上的agent程序, 支持direct/sysctl.service/docker/docker-compose/k8s deployment等方式运行
## Contribute
### Code path of gs
1. cmd/server/cmd/agent-server.go --> entrance of agent-server
2. cmd/server/cmd/api-server.go --> entrance of api-server
3. cmd/server/cmd/worker.go --> entrance of worker
### Deploy method of gs
```
cd cmd/server
make redeploy_all // build code & docker build image & delete deployment & deploy in kubernetes
make update // build code & docker build image & apply deployment
```
### Code path of ga
1. cmd/agent/cmd/run.go --> entrance of agent
### Deploy method of ga
```
cd cmd/agent
make // build code
```
## Architecture
![architecture](/doc/pic/architecture.png)
![dataFlow](/doc/pic/basic_data_flow.png)
GOPROXY=export GO111MODULE=on && export GOPRIVATE=gitlab-gpuhub.autodl.com && export GOPROXY=https://goproxy.cn
GOCMD=$(GOPROXY) && go
GORUN=$(GOCMD) run
DOCKER=docker
DockerImageName=hub.kce.ksyun.com/gpuhub/agent:latest
DockerTestImageName=hub.kce.ksyun.com/gpuhub-test/agent:latest
FtDockerImageName=hub.kce.ksyun.com/gpuhub/ft-agent:latest
FtDockerTestImageName=hub.kce.ksyun.com/gpuhub-test/ft-agent:latest
now:=$(shell date +%Y%m%d%H%M%S)
formatNow=$(shell date +%Y-%m-%d/%H:%M:%S)
ldflags="-X 'kvm_server/pkg/libs.BuiltTime=${formatNow}' -X 'kvm_server/pkg/libs.GitCommit=`git describe --all --long`' -X 'kvm_server/pkg/libs.GoVersion=`go version`'"
# 动态获取的项目根目录的绝对路径
ROOT_PATH:=$(abspath $(dir $(abspath $(firstword $(MAKEFILE_LIST))))../../)
.PHONY: build docker
all: build
do_build:
-@rm ./kvm_agent
$(GOCMD) build -o kvm_agent -ldflags ${ldflags} .
chmod 777 ./kvm_agent
build:
-@rm ./kvm_agent
$(DOCKER) run --rm -w /code/cmd/agent -v /tmp/go_mod:/root/go/pkg/mod -v ${ROOT_PATH}:/code ningfd/golang:1.9-ubuntu20.04 sh -c "make do_build"
-@upx -9 kvm_agent
/*
Copyright © 2021 seetaas
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package cmd
import (
"fmt"
"github.com/spf13/cobra"
"kvm_server/pkg/libs"
_ "net/http/pprof"
"os"
)
var cfgFile string
var version bool
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "kvm_agent",
Version: libs.LatestVersion,
Short: "kvm_agent is a set of platform agent tool",
Long: `kvm_agent is a set of platform agent tool.`,
// Uncomment the following line if your bare application
// has an action associated with it:
// PreRun: func(cmd *cobra.Command, args []string) {
//
// },
}
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
/*
Copyright © 2021 NAME HERE <EMAIL ADDRESS>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"context"
"github.com/spf13/cobra"
"kvm_server/entrance/agent/guard"
"kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg-agent/agent_guard"
"kvm_server/pkg/libs"
"kvm_server/pkg/logger"
"kvm_server/pkg/signal_watcher"
"sync"
)
var (
url string
token string
region string
verbose bool
isTestEnv bool
localRedisHost string
currentVersion string
cpuCoreNumsPerUnit int // 每个虚拟化的gpu多少个cpu核心
maxCpuUnitNum int // 当使用cpu虚拟gpu时, 最多虚拟化出多少个gpu
ossFileChecker bool // 当前agent是否启用 ossFileChecker
)
// runCmd represents the run command
var runCmd = &cobra.Command{
Use: "run",
Short: "Connect with kvm_server",
Long: `Connect with kvm_server, listen options from kvm_server, report machine status to kvm_server.`,
Run: func(cmd *cobra.Command, args []string) {
logger := logger.NewLogger("Agent")
logger.Info("[Version] %s", libs.LatestVersion)
if currentVersion != "v1" {
agent_constant.CurrentRuntimeVersion = agent_constant.RuntimeVersion(currentVersion)
}
logger.Info("[current runtime version] %s, %s", currentVersion, agent_constant.CurrentRuntimeVersion)
logger.Info("url: %s, token: %s, region: %s", url, libs.Mask(token), region)
logger.Info("ready to running...")
if verbose {
agent_constant.VerboseMode = verbose
logger.Info("open verbose mode")
}
if isTestEnv {
logger.Info("use test env mode...")
agent_guard.IsTestEnv = isTestEnv
}
// 必须填入地区
if len(region) == 0 {
logger.Error("Start gpu agent failed... No region sign... exit.")
return
}
// 用于监听严重错误, 需要重启服务的信号, 缓冲区设置较长的原因是有可能同时发生n个错误, 避免chan阻塞
panicChan := make(chan error, 10)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func(ctx context.Context) {
guard.RunGuard(ctx, panicChan, url, token, region, localRedisHost, cpuCoreNumsPerUnit, maxCpuUnitNum, ossFileChecker)
wg.Done()
}(ctx)
sig := signal_watcher.NewSignalWatcher(panicChan, cancel, &wg)
sig.Run()
},
}
func init() {
rootCmd.AddCommand(runCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
runCmd.PersistentFlags().StringVarP(&url, "url", "u", "192.168.1.126:33001", "kvm_server url like 192.168.1.126:33001")
runCmd.PersistentFlags().StringVarP(&token, "token", "t", "seetatech666", "kvm_server url url token")
runCmd.PersistentFlags().StringVarP(&region, "region", "r", "", "kvm_server region") // 有些 gpu 机器特别地在 yaml 设置不带地区网盘, 所以默认值必须为空.
runCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "w", false, "verbose") // 输出详细信息
runCmd.PersistentFlags().BoolVar(&isTestEnv, "test", false, "test") // 输出详细信息
runCmd.PersistentFlags().StringVarP(&localRedisHost, "local_redis", "", "", "local redis for diff cache, e.g. 127.0.0.1:6379") //
runCmd.PersistentFlags().StringVarP(&currentVersion, "current_version", "", "v1", "currentVersion: v1 v2 is valid, default is v1") //
runCmd.PersistentFlags().IntVar(&cpuCoreNumsPerUnit, "cpu_core_num_per_unit", 0, "A value greater than 0 represents the CPU machine, indicating how many CPU cores are in a rented unit. The default is 0, representing the GPU machine")
runCmd.PersistentFlags().IntVar(&maxCpuUnitNum, "max_cpu_unit_num", 0, "When using a CPU to virtualize a GPU, how many gpus are virtualized at most. Valid only if the 'cpu_core_num_per_unit' is not 0")
runCmd.PersistentFlags().BoolVar(&ossFileChecker, "oss-file-checker", false, "")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// runCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}
/*
Copyright © 2021 seetaas
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package main
import (
"kvm_server/cmd/agent/cmd"
)
func main() {
cmd.Execute()
}
app:
debug_log: true
# debug_api 用作进行debug的接口组, 生产环境严禁开启, 会有发生越权操作的可能
debug_api: true
# docs_api 开启swagger api 接口列表
docs_api: true
# 共享存储在服务容器内的rootfs path
storage_root_path: /mnt
mysql:
host: "0.0.0.0:9910"
user: "gas"
password: "gas"
db_name: "gas"
redis:
host: "0.0.0.0:9920"
password: ""
rabbit_mq:
host: "0.0.0.0:9930"
user: "gas"
password: "gas"
influxdb:
host: "http://192.168.1.126:8086"
token: "ftebdKMQ_pH1-UpRUlJvVTgXc409eN9sW0M3gc7WFehavdpR-tzjrpaMTDlwNQ2NvgUkiOO5rClTBTxJhtASRQ=="
package conf
import (
"crypto/ecdsa"
"strings"
"github.com/dgrijalva/jwt-go"
"github.com/spf13/viper"
)
var globalGsConfig GsConfig
func SetGlobalGsConfig() {
globalGsConfig.App.DebugLog = viper.GetBool("app.debug_log")
globalGsConfig.App.DebugApi = viper.GetBool("app.debug_api")
globalGsConfig.App.DocsApi = viper.GetBool("app.docs_api")
globalGsConfig.App.StorageRootPath = viper.GetString("app.storage_root_path")
globalGsConfig.App.CoreHost = viper.GetString("app.core_host")
globalGsConfig.App.Platform = viper.GetString("app.platform")
globalGsConfig.App.HostingcloudPublicKeyFile = viper.GetString("app.hostingcloud_public_key_file")
globalGsConfig.App.PublicApiToken = viper.GetString("app.public_api_token")
globalGsConfig.App.CodeWithGpuDomain = viper.GetString("app.code_with_gpu_domain")
globalGsConfig.App.BusinessCallbackDomain = viper.GetString("app.business_callback_domain")
globalGsConfig.MySQL.Host = viper.GetString("mysql.host")
globalGsConfig.MySQL.HostRO = viper.GetString("mysql.host_ro")
globalGsConfig.MySQL.User = viper.GetString("mysql.user")
globalGsConfig.MySQL.Password = viper.GetString("mysql.password")
globalGsConfig.MySQL.DBName = viper.GetString("mysql.db_name")
globalGsConfig.Redis.Host = viper.GetString("redis.host")
globalGsConfig.Redis.Password = viper.GetString("redis.password")
globalGsConfig.RabbitMQ.Host = viper.GetString("rabbit_mq.host")
globalGsConfig.RabbitMQ.User = viper.GetString("rabbit_mq.user")
globalGsConfig.RabbitMQ.Password = viper.GetString("rabbit_mq.password")
globalGsConfig.InfluxDB.Host = viper.GetString("influxdb.host")
globalGsConfig.InfluxDB.Token = viper.GetString("influxdb.token")
globalGsConfig.Frps.ExposeURL = viper.GetString("frps.expose_url")
globalGsConfig.Frps.AllowPorts = viper.GetString("frps.allow_ports")
globalGsConfig.Frps.Token = viper.GetString("frps.token")
globalGsConfig.Core.MySQL.Host = viper.GetString("core.mysql.host")
globalGsConfig.Core.MySQL.User = viper.GetString("core.mysql.user")
globalGsConfig.Core.MySQL.Password = viper.GetString("core.mysql.password")
globalGsConfig.Core.MySQL.DBName = viper.GetString("core.mysql.db_name")
globalGsConfig.Key.PrivateKey = nil
globalGsConfig.Key.PublicKey = nil
globalGsConfig.TencentOss.Host = viper.GetString("tencent_oss.host")
globalGsConfig.TencentOss.BucketHost = viper.GetString("tencent_oss.bucket_host")
globalGsConfig.TencentOss.AccessID = viper.GetString("tencent_oss.access_id")
globalGsConfig.TencentOss.AccessSecret = viper.GetString("tencent_oss.access_secret")
globalGsConfig.CmbBank.UID = viper.GetString("cmb_bank.uid")
globalGsConfig.CmbBank.Sm2 = viper.GetString("cmb_bank.sm2")
globalGsConfig.CmbBank.Sm4 = viper.GetString("cmb_bank.sm4")
globalGsConfig.CmbBank.CardNbr = viper.GetString("cmb_bank.card_nbr")
globalGsConfig.Payment.AliPayV2.AppID = viper.GetString("payment.alipay_v2.app_id")
initPem()
}
func GetGlobalGsConfig() GsConfig {
return globalGsConfig
}
func GetProxyInfo() (proxyHost string, proxyPort string, proxyToken string) {
exposeURL := globalGsConfig.Frps.ExposeURL
h := strings.Split(exposeURL, ":")
if len(h) == 2 {
proxyHost = h[0]
proxyPort = h[1]
}
proxyToken = globalGsConfig.Frps.Token
return
}
type GsConfig struct {
App struct {
DebugLog bool
DebugApi bool
DocsApi bool
StorageRootPath string
CoreHost string
Platform string `json:"platform"` // 当前运行的平台 autodl, gpuhub, gpufree
HostingcloudPublicKeyFile string `json:"hostingcloud_public_key_file"`
PublicApiToken string
CodeWithGpuDomain string
BusinessCallbackDomain string
}
MySQL struct {
Host string
HostRO string
User string
Password string
DBName string
}
Redis struct {
Host string
Password string
}
RabbitMQ struct {
Host string
User string
Password string
}
InfluxDB struct {
Host string
Token string
}
Frps struct {
ExposeURL string
AllowPorts string
Token string
}
Key struct {
PrivateKey *ecdsa.PrivateKey
PublicKey *ecdsa.PublicKey
}
Core struct {
MySQL struct {
Host string
User string
Password string
DBName string
}
}
TencentOss struct {
Host string
BucketHost string
AccessID string
AccessSecret string
}
CmbBank struct {
UID string
Sm2 string
Sm4 string
CardNbr string
}
Payment struct {
AliPayV2 struct {
AppID string
}
}
}
func HotReloadSet(debugLog bool) {
globalGsConfig.App.DebugLog = debugLog
}
func initPem() {
var err error
globalGsConfig.Key.PrivateKey, err = jwt.ParseECPrivateKeyFromPEM([]byte(privateKeyContent))
if err != nil {
panic("Unable to parse ECDSA private key: %v" + err.Error())
}
globalGsConfig.Key.PublicKey, err = jwt.ParseECPublicKeyFromPEM([]byte(publicKeyContent))
if err != nil {
panic("Unable to parse ECDSA public key: %v" + err.Error())
}
}
var privateKeyContent = `-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIP4J0SIl3WzaCRzyW/e19JtDAR9m3/p7HxKiquSwMInPoAoGCCqGSM49
AwEHoUQDQgAEPaUUBFYyG8jRqxPzVV0PAiUmyQ4EpzlrskMWTnRr9Zsd4a+I5f1s
Q4fTO8841mVoAwL6u9+Kr0MD6yZrCNzWAw==
-----END EC PRIVATE KEY-----`
var publicKeyContent = `-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEPaUUBFYyG8jRqxPzVV0PAiUmyQ4E
pzlrskMWTnRr9Zsd4a+I5f1sQ4fTO8841mVoAwL6u9+Kr0MD6yZrCNzWAw==
-----END PUBLIC KEY-----
`
app:
debug_log: false
# debug_api 用作进行debug的接口组, 生产环境严禁开启, 会有发生越权操作的可能
debug_api: true
# docs_api 开启swagger api 接口列表
docs_api: true
# 共享存储在服务容器内的rootfs path
storage_root_path: /mnt
code_with_gpu_domain: "https://test.codewithgpu.com:33443"
business_callback_domain: "https://test.autodl.com"
mysql:
host: "test.autodl.com:33306"
host_ro: "test.autodl.com:33306"
user: "root"
password: "seetatech"
db_name: "gpuhub_v21"
redis:
host: "192.168.1.126:6379"
password: "seetatech"
rabbit_mq:
host: "192.168.1.126:35673"
user: "admin"
password: "seetatech123!!!"
influxdb:
host: "http://192.168.1.126:8086"
token: "ftebdKMQ_pH1-UpRUlJvVTgXc409eN9sW0M3gc7WFehavdpR-tzjrpaMTDlwNQ2NvgUkiOO5rClTBTxJhtASRQ=="
frps:
# expose_url 指的是frps的访问地址, 用于agent访问, 如果agent都运行在k8s中, 可以使用svc命名url
# 如果agent可能运行在公网环境下, 那么需要使用frps在公网环境下的url
expose_url: 192.168.1.126:7000
# 定义frps的端口范围, 注意, 这项修改只会对之后运行的实例产生影响, 已经建立连接的不会重新申请
# "35000-45000" 指 35000 到 45000共计10001个端口
allow_ports: "35000-45000"
token: "seetatech666"
minio_credentials:
endpoint: oss.autodl.com:9000
access_key_id: minioadmin
secret_access_key: minioadmin
core:
mysql:
host: "192.168.1.126:3306"
user: "root"
password: "seetatech"
db_name: "gpuhub"
tencent_oss:
host: "https://ap-beijing.myqcloud.com"
bucket_host: "https://autodl-test-1310972338.cos.ap-beijing.myqcloud.com"
access_id: "AKIDc9av9CnAJExUt41YsxmOmvwyoRvCMJ1U"
access_secret: "84mL82qTs33qOiqmbGG5pPoX4RcZJprj"
cmb_bank:
uid: "N002461179"
sm2: "NBtl7WnuUtA2v5FaebEkU0/Jj1IodLGT6lQqwkzmd2E="
sm4: "VuAzSWQhsoNqzn0K"
card_nbr: "755915671510811"
version: '3.9'
services:
mysql:
image: 'mysql:5.7'
ports:
- 9910:3306
environment:
- MYSQL_DATABASE=kvm
- MYSQL_USER=kvm
- MYSQL_PASSWORD=kvm
- MYSQL_RANDOM_ROOT_PASSWORD="yes"
command: ["mysqld", "--character-set-server=utf8mb4", "--collation-server=utf8mb4_unicode_ci"]
redis:
image: 'redis:6.2-alpine'
ports:
- 9920:6379
rabbitmq:
image: 'rabbitmq:3-management'
ports:
- 9930:5672
- 9932:15672
environment:
- RABBITMQ_DEFAULT_USER=kvm
- RABBITMQ_DEFAULT_PASS=kvm
\ No newline at end of file
package test
import (
"fmt"
"github.com/spf13/viper"
"kvm_server/conf"
"os"
"path/filepath"
"strings"
"sync"
)
var once sync.Once
func InitConfigForTest() {
once.Do(do)
}
// init for test cases
func do() {
var isTest bool
var rootPath string
for _, v := range os.Args {
isTest = strings.Contains(v, "initTest=")
if isTest {
rootPath = strings.TrimLeft(v, "initTest=")
break
}
}
if isTest {
viper.SetConfigFile(filepath.Join(rootPath, "conf/kvm.yaml"))
if err := viper.ReadInConfig(); err == nil {
viper.Set("mysql.host", "0.0.0.0:9910")
viper.Set("mysql.user", "gas")
viper.Set("mysql.password", "gas")
viper.Set("mysql.db_name", "gas")
viper.Set("redis.host", "0.0.0.0:9920")
viper.Set("redis.password", "")
viper.Set("rabbit_mq.host", "0.0.0.0:9930")
viper.Set("rabbit_mq.user", "gas")
viper.Set("rabbit_mq.password", "gas")
viper.Set("core.mysql.host", "0.0.0.0:9910")
viper.Set("core.mysql.user", "gas")
viper.Set("core.mysql.password", "gas")
viper.Set("core.mysql.db_name", "gas")
viper.Set("app.debug_log", false)
conf.SetGlobalGsConfig()
fmt.Println("Config file found at ", viper.ConfigFileUsed())
} else {
fmt.Println("Config file not found, use default config. err: ", err.Error())
os.Exit(1)
}
}
}
package test
import (
"kvm_server/conf"
"testing"
)
func TestInitConfig(t *testing.T) {
InitConfigForTest()
t.Log("=======================")
t.Log("config for test: ", conf.GetGlobalGsConfig())
t.Log("=======================")
}
GOPROXY=export GO111MODULE=on && export GOPRIVATE=gitlab-gpuhub.autodl.com && export GOPROXY=https://goproxy.cn
GOCMD=$(GOPROXY) && go
GODIR := $(shell go list ./...)
.PHONY: tools.install
test.run: start.docker-compose test.go.run stop.docker-compose
start.docker-compose:
@echo 'Run daemon service at '$(PWD)
docker-compose -f conf/test/docker-compose.yml up -d
stop.docker-compose:
#docker-compose -f conf/test/docker-compose.yml down
test.go.run:
@echo "===========> Run unit test"
@mkdir -p _output
$(GOCMD) test -race -cover -coverprofile=coverage.data $(GODIR) -args initTest=$(PWD)
@echo "===========> Show unit test coverage statements"
@$(GOCMD) tool cover -func=coverage.data | grep -v 0.0
\ No newline at end of file
package guard
import (
"context"
"github.com/pkg/errors"
"kvm_server/pkg-agent/agent_booter"
constant "kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg-agent/agent_container"
"kvm_server/pkg-agent/agent_container_monitor"
"kvm_server/pkg-agent/agent_files"
"kvm_server/pkg-agent/agent_guard"
"kvm_server/pkg-agent/agent_machine_monitor"
"kvm_server/pkg-agent/diff_cache"
"kvm_server/pkg-agent/hardware/resource/cpu_set"
"kvm_server/pkg-agent/messenger"
storageConstant "kvm_server/pkg-storage-agent/storage_agent_constant"
"kvm_server/pkg/logger"
"net/http"
"time"
)
func RunGuard(ctx context.Context, panicChan chan<- error, serverURL, token, region, localRedisHost string, cpuCoreNumsPerUnit, maxCpuUnitNum int, ossFileChecker bool) {
l := logger.NewLogger("RunGuard")
l.Info("ready to init guard...")
defer func() {
l.Info("guard quit...")
}()
url := constant.ParseUrl(serverURL)
l.WithField("host", url.GetHost()).Info("kvm_server host parsed")
dockerClient, err := constant.NewDockerClient()
if err != nil {
err = errors.Wrap(err, "failed to new docker client")
panicChan <- err
return
}
checkFilesError := agent_files.CheckBootFilesIsPrepared()
if checkFilesError != nil {
panicChan <- checkFilesError
return
}
diff_cache.InitLocalRedis(ctx, l, localRedisHost, ossFileChecker)
booter := agent_booter.NewBooter(dockerClient)
params := &constant.NewContainerParam{}
container := agent_container.NewContainer(params, dockerClient)
containerMonitor := agent_container_monitor.NewContainerFinder(dockerClient, container)
// 如果 cpuCoreNumsPerUnit为0, 不会报错
err = cpu_set.CpuSetHardwareDriverInit(l, cpuCoreNumsPerUnit, maxCpuUnitNum)
if err != nil {
err = errors.Wrap(err, "CpuSetHardwareDriverInit failed")
panicChan <- err
return
}
machineMonitor := agent_machine_monitor.NewMachineMonitor(dockerClient, region)
machineHardWareInfo, err := machineMonitor.GetMachineHardwareInfo()
if err != nil {
err = errors.Wrap(err, "failed to get machine hardware info")
panicChan <- err
return
}
l.WithField("machine id", machineHardWareInfo.MachineID).Info("machine id detected")
//err = logger.InitHookToSendLogToAPI(ctx, machineHardWareInfo.MachineID, token, url.GetSaveLogUrls())
//if err != nil {
// l.WarnE(err, "can not connect to log server")
// err = nil
//}
m, err := tryConnectToServer(url, token, machineHardWareInfo.MachineID, region)
if err != nil {
err = errors.Wrap(err, "failed to exec messenger dial")
l.E(err)
panicChan <- err
return
}
guard := agent_guard.NewGuard(machineHardWareInfo.MachineID, machineHardWareInfo.MachineName, region, m, booter, containerMonitor, machineMonitor)
guard.Run(ctx, panicChan)
return
}
func tryConnectToServer(url *constant.URL, token string, machineID string, region string) (m *messenger.Messenger, err error) {
header := &http.Header{}
header.Set(constant.AgentAuthorizationHeaderKey, token)
header.Set(constant.AgentMachineIDHeaderKey, machineID)
if len(region) != 0 {
header.Set(storageConstant.AgentRegionSignHeaderKey, region)
}
for _, u := range url.GetWebsocketConnectUrls() {
m, err = connectToServer(u, header)
if err == nil {
constant.ServerConnectedURL = url
return
}
}
return
}
func connectToServer(url string, h *http.Header) (m *messenger.Messenger, err error) {
m = messenger.NewMessengerForClient(url, *h)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
err = m.Dial(ctx)
return
}
package agent_constant
import (
"encoding/json"
"github.com/pkg/errors"
"kvm_server/pkg-agent/agent_constant"
"strings"
)
type MachineStatusParam struct {
agent_constant.MachineHealthInfo
}
type MachineRegisterParam struct {
agent_constant.MachineHardwareInfo
}
var (
EchoMessageSkippedError = errors.New("this msg contain 'response_from_agent_flag', do not write response into socket pipe")
)
type ResponseCode string
const (
CodeOK = ""
CodeErr = "Err"
)
func (c *MachineStatusParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return err
}
func (c *MachineRegisterParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return err
}
module kvm_server
go 1.22.4
require (
github.com/aliyun/alibaba-cloud-sdk-go v1.63.88
github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8
github.com/containerd/containerd v1.7.25
github.com/deckarep/golang-set/v2 v2.7.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/docker/docker v23.0.3+incompatible
github.com/docker/go-connections v0.5.0
github.com/docker/go-units v0.5.0
github.com/dustin/go-humanize v1.0.1
github.com/g0rbe/go-chattr v1.0.1
github.com/gin-contrib/cors v1.7.3
github.com/gin-gonic/gin v1.10.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.8.1
github.com/google/go-querystring v1.1.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/hashicorp/go-version v1.7.0
github.com/influxdata/influxdb-client-go/v2 v2.14.0
github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible
github.com/levigross/grequests v0.0.0-20231203190023-9c307ef1f48d
github.com/mindprince/gonvml v0.0.0-20211002210717-ac0b66419a41
github.com/minio/minio-go/v7 v7.0.85
github.com/mojocn/base64Captcha v1.3.8
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.36.2
github.com/opencontainers/image-spec v1.1.0
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.1
github.com/satori/go.uuid v1.2.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/shopspring/decimal v1.4.0
github.com/sirupsen/logrus v1.9.3
github.com/smartwalle/alipay/v3 v3.2.24
github.com/spf13/cast v1.6.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/streadway/amqp v1.1.0
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.4
github.com/tencentyun/cos-go-sdk-v5 v0.7.61
github.com/tjfoc/gmsm v1.4.1
github.com/vonwenm/go-crypt v0.0.0-20150101072519-0000e689301e
github.com/wechatpay-apiv3/wechatpay-go v0.2.20
github.com/zRedShift/mimemagic v1.2.0
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac
golang.org/x/net v0.35.0
golang.org/x/sys v0.30.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
gopkg.in/ini.v1 v1.67.0
gorm.io/datatypes v1.2.5
gorm.io/driver/mysql v1.5.7
gorm.io/gorm v1.25.12
libvirt.org/go/libvirt v1.11001.0
libvirt.org/go/libvirtxml v1.11000.1
)
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/bytedance/sonic v1.12.6 // indirect
github.com/bytedance/sonic/loader v0.2.1 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/containerd/continuity v0.4.5 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ini/ini v1.67.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.6 // indirect
github.com/go-openapi/spec v0.20.4 // indirect
github.com/go-openapi/swag v0.19.15 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.23.0 // indirect
github.com/goccy/go-json v0.10.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/sys/sequential v0.6.0 // indirect
github.com/moby/sys/user v0.3.0 // indirect
github.com/moby/sys/userns v0.1.0 // indirect
github.com/moby/term v0.5.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mozillazg/go-httpheader v0.2.1 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/oapi-codegen/runtime v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/runc v1.2.4 // indirect
github.com/opencontainers/selinux v1.11.1 // indirect
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/smartwalle/ncrypto v1.0.4 // indirect
github.com/smartwalle/ngx v1.0.9 // indirect
github.com/smartwalle/nsign v1.0.9 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.10.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/arch v0.12.0 // indirect
golang.org/x/crypto v0.33.0 // indirect
golang.org/x/image v0.23.0 // indirect
golang.org/x/text v0.22.0 // indirect
golang.org/x/tools v0.30.0 // indirect
google.golang.org/protobuf v1.36.3 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.5.2 // indirect
)
This diff is collapsed. Click to expand it.
package agent_booter
import (
"context"
"fmt"
constant "kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg-agent/agent_container"
"kvm_server/pkg-agent/diff_cache"
"kvm_server/pkg-agent/messenger"
"kvm_server/pkg/libs"
"os"
"path/filepath"
)
func (b *Booter) CreateContainer(ctx context.Context, param constant.ContainerCreateParam, writer chan<- messenger.Message) (dockerContainerUUID string, err error) {
cd := agent_container.NewContainer(&param.NewContainerParam, b.dockerClient)
dockerContainerUUID, err = cd.Create(ctx, param, writer)
if err != nil {
b.logger.ErrorE(err, "create container failed")
return
}
return
}
func (b *Booter) StartContainer(ctx context.Context, param constant.ContainerCreateParam) (err error) {
cd := agent_container.NewContainer(&param.NewContainerParam, b.dockerClient)
err = cd.Start(ctx)
if err != nil {
b.logger.ErrorE(err, "run container")
return
}
return
}
func (b *Booter) StopContainer(ctx context.Context, param constant.ContainerRuntimeParam) (err error) {
cd := agent_container.NewContainerRuntime(&param.NewContainerRuntimeParam, b.dockerClient)
err = cd.Stop(ctx)
if err != nil {
b.logger.ErrorE(err, "run container")
return
}
return
}
func (b *Booter) StopADFSContainer(ctx context.Context, param constant.ContainerRuntimeParam) (err error) {
cd := agent_container.NewContainerRuntime(&param.NewContainerRuntimeParam, b.dockerClient)
cd.StopADFSContainer(ctx)
return
}
func (b *Booter) RemoveContainer(ctx context.Context, param constant.ContainerRuntimeParam) (err error) {
cd := agent_container.NewContainerRuntime(&param.NewContainerRuntimeParam, b.dockerClient)
err = cd.Remove(ctx)
if err != nil {
b.logger.ErrorE(err, "run container")
return
}
return
}
func (b *Booter) SaveContainer(ctx context.Context, param constant.ContainerSaveParam, rootFSUsedSize int64, upperFilePath, imageID string) (err error) {
imageInspect, err := agent_container.GetImageInspect(b.dockerClient, imageID)
if err != nil {
b.logger.WarnE(err, "handler GetImageInspect failed")
return
}
err = libs.MkdirAll(constant.MigrationOssTmpPath, os.ModePerm)
if err != nil {
b.logger.WarnE(err, "create oss tmp file failed: mkdir for '%s' failed.", constant.MigrationOssTmpPath)
err = fmt.Errorf("mkdir for oss dir failed: %+v", err)
return
}
var tmpTarPath = filepath.Join(constant.MigrationOssTmpPath, param.MinioBucketInfo.ObjectName) // e.g. /tmp/xx.tar
// 防止 upload 命令重发
exist := libs.ExistPathWithCtx(context.Background(), tmpTarPath)
if exist {
b.logger.Info("Upload diff file: exist tar file in nas '%s'", tmpTarPath)
err = os.Remove(tmpTarPath)
if err != nil {
b.logger.WarnE(err, "remove tmp tar file failed")
return err
}
}
defer func() {
if !diff_cache.DiffNeedCache(b.logger) || err != nil {
//删除临时文件
tmpErr := os.Remove(tmpTarPath)
if tmpErr != nil {
b.logger.WarnE(tmpErr, "remove tmp tar file failed")
}
} else {
diff_cache.AddCacheRecord(b.logger, param.MinioBucketInfo.ObjectName)
}
}()
//1 compress diff file
err = agent_container.CompressDiffFile(ctx, b.logger, imageInspect.VirtualSize, rootFSUsedSize, upperFilePath, tmpTarPath, param.ImageUUID, param.ProgressHook)
if err != nil {
b.logger.WarnE(err, "compress diff failed: run tar cmd failed.")
err = fmt.Errorf("compress by tar failed: %+v", err)
return
}
//2 upload to oss
// Initialize minio client object.
minioClient, err := constant.NewMinioClient(param.MinioCredentials)
if err != nil {
b.logger.WarnE(err, "initialize minio client failed")
return
}
fileSize, err := agent_container.UploadFileToOss(ctx, minioClient, b.logger, imageInspect.VirtualSize, param.ImageUUID, tmpTarPath, param.MinioBucketInfo.BucketName, param.MinioBucketInfo.ObjectName, param.ProgressHook)
if err != nil {
b.logger.ErrorE(err, "handler uploadFileToOss failed")
return
}
uploadOssInfo := &constant.UploadOssInfo{
ImageUUID: param.ImageUUID,
ObjectSize: fileSize,
ImageSize: fileSize + imageInspect.VirtualSize, //可读成的数据和原始镜像数据和
Progress: 100,
}
param.ProgressHook(uploadOssInfo)
return
}
func (b *Booter) ReInitContainer(
ctx context.Context,
createParam constant.ContainerCreateParam,
runtimeParam constant.ContainerRuntimeParam,
writer chan<- messenger.Message,
) (err error) {
b.logger.WithField("container_id", runtimeParam.ContainerID).Info("re-init step 1: remove container but retain vols.")
cd := agent_container.NewContainerRuntime(&runtimeParam.NewContainerRuntimeParam, b.dockerClient)
err = cd.RemoveButRetainVolumes(ctx)
if err != nil {
b.logger.ErrorE(err, "re-init step 1: remove container failed.")
return
}
b.logger.WithField("container_id", createParam.ContainerID).Info("re-init step 2: create container.")
cd2 := agent_container.NewContainer(&createParam.NewContainerParam, b.dockerClient)
_, err = cd2.Create(ctx, createParam, writer)
if err != nil {
b.logger.ErrorE(err, "re-init step 2: create container failed.")
return
}
if createParam.DownloadPrivateImageInfo != nil {
data, _, err := agent_container.GetGraphDriverDataAndImageID(b.dockerClient, createParam.ContainerID)
if err != nil {
b.logger.WarnE(err, "Merge diff failed: cannot get GraphDriverData.")
return err
}
err = libs.RemoveAllUnderTheDir(data.UpperDir)
if err != nil {
b.logger.WarnE(err, "Merge diff failed: cannot clean docker upper dir '%s'.", data.UpperDir)
err = fmt.Errorf("clean docker diff dir failed: %+v", err)
return err
}
var tmpTarName = createParam.DownloadPrivateImageInfo.MinioBucketInfo.ObjectName
var netDiskPath = filepath.Join(constant.MigrationOssTmpPath, tmpTarName)
_, statErr := os.Stat(netDiskPath)
if statErr != nil {
netDiskPath = constant.NewDiffTemporaryName(netDiskPath, createParam.ContainerID)
}
b.logger.Info("use downloaded image: %s", netDiskPath)
err = libs.RemoveAllUnderTheDir(data.UpperDir)
if err != nil {
b.logger.WarnE(err, "Merge diff failed: cannot clean docker upper dir '%s'.", data.UpperDir)
err = fmt.Errorf("clean docker diff dir failed: %+v", err)
return err
}
err = libs.DecompressTarToPath(ctx, netDiskPath, data.UpperDir)
if err != nil {
b.logger.WarnE(err, "Merge diff failed: run tar cmd failed.")
err = fmt.Errorf("decompress by tar cmd failed: %+v", err)
return err
}
//err = os.Remove(netDiskPath)
//if err != nil {
// b.logger.ErrorE(err, "Clean diff failed. Please rm the tar manually. path: %s", netDiskPath)
// return err
//}
}
return
}
package agent_booter
import (
"kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg/logger"
)
type Booter struct {
dockerClient agent_constant.DockerClient
logger *logger.Logger
}
func NewBooter(dockerClient agent_constant.DockerClient) (booter *Booter) {
booter = &Booter{
logger: logger.NewLogger("Booter"),
dockerClient: dockerClient,
}
return
}
package agent_cg_tool
import (
"fmt"
"github.com/gin-gonic/gin"
"golang.org/x/net/context"
"kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg-agent/diff_cache"
"kvm_server/pkg/logger"
"net/http"
"os"
"path/filepath"
"strconv"
)
type ModelFileDownloadByAgentRequest struct {
Token string `json:"token"` // 用于本地没找到,去storage-agent下载
Host string `json:"host"`
ModelName string `json:"model_name"`
FileName string `json:"file_name"`
FileSize int64 `json:"file_size"`
MD5 string `json:"md5"`
ContainerID string `json:"container_id"`
}
func CGDownloadModel(c *gin.Context) {
// 只有一个函数, 把中间件校验直接放在此处
if ok, _ := IsCgTokenOK(c.GetHeader(CgTokenKey)); !ok {
c.String(401, "cgToken incorrect")
return
}
l := logger.NewLogger("CGDownloadModel")
var req ModelFileDownloadByAgentRequest
if err := c.ShouldBindJSON(&req); err != nil {
l.ErrorE(err, "request params bind failed")
c.String(400, "request params bind failed")
return
}
l.WithField("payload", req).Info("get a msg")
l = l.WithFields(map[string]interface{}{"fileName": req.FileName, "container_id": req.ContainerID})
/*
检查本地是否存在
存在 直接拷贝至容器内目标路径
不存在
查看本地cache是否存在
存在 从缓存下载
不存在 从oss下载
重命名, 拷贝至目标路径
为保证可读性, 降低耦合, 每个返回方式单独处理, 所有会有很多重复代码
*/
cacheFileName := fmt.Sprintf("cg-%s", req.MD5)
md5FilePath := filepath.Join(agent_constant.MigrationOssTmpPath, cacheFileName)
tmpFilePath := fmt.Sprintf("%s.%s.downloading", md5FilePath, req.ContainerID)
// 存在本地文件
stat, err := os.Stat(md5FilePath)
if err == nil {
l.Info("find local file [%s], try to use it.", md5FilePath)
// 存在判断大小, 符合的保留, 不符合的删除后重新下载
if stat.Size() == req.FileSize {
c.File(md5FilePath)
return
}
l.Info("file [%s] local size does not match the remote size, [%d!=%d], remove and download it now.", md5FilePath, stat.Size(), req.FileSize)
err = os.Remove(md5FilePath)
if err != nil {
l.Info("remove file [%s] failed", md5FilePath)
}
}
// 没有本地文件, 新下载, 准备.downloading文件
var file *os.File
defer os.Remove(tmpFilePath) // 在退出前都主动清除一下.downloading文件
file, err = os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
l.ErrorE(err, "open file [%s] failed", tmpFilePath)
return
}
defer file.Close()
// 没有本地文件, 查看内网cache
var needCache bool
diffCacheGetParams := &diff_cache.DiffCacheGetParams{
FileName: cacheFileName,
NotAvailableMap: map[string]struct{}{},
}
ctx := context.Background()
for {
file.Seek(0, 0)
// 将获取的逻辑封装在diff_cache内部, 此处只需要知道有没有可用的缓存就可以了
cacheFileOK, cacheFileReader, _, cacheFromIP, needNewCache := diff_cache.DiffCacheGet(l, diffCacheGetParams)
needCache = needNewCache
if !cacheFileOK {
l.Info("file [%s] have no cache, will get from oss...", cacheFileName)
break
}
if needCache {
// 获取到一个可用的内网文件, 开始下载
_, err = copyWithProgress(ctx, file, cacheFileReader)
if err != nil {
l.ErrorE(err, "download [%s] from [%s] failed, try next one...", tmpFilePath, cacheFromIP)
// 此处失败不应退出, 而是尝试下一个本地缓存
diffCacheGetParams.NotAvailableMap[cacheFromIP] = struct{}{}
continue
}
l.Info("download file [%s] from [%s] success", tmpFilePath, cacheFromIP)
// 下载完成, 重命名, 发送文件
err = os.Rename(tmpFilePath, md5FilePath)
if err != nil {
l.ErrorE(err, "Internal error, rename file [%s] to [%s] failed", tmpFilePath, md5FilePath)
return
}
diff_cache.AddCacheRecord(l, cacheFileName)
c.File(md5FilePath)
return
}
// 不需要cache
c.DataFromReader(200, req.FileSize, "application/octet-stream", cacheFileReader, nil)
return
}
// 所有的本地缓存尝试失败了, 从oss下载, 此时一定是需要本地缓存的
l.Info("no cache found, download file from cg.")
// 去oss下载
downloadUrl := fmt.Sprintf("%s/api/v1/file/%s/download", req.Host, req.Token)
resp, err := http.Get(downloadUrl)
if err != nil {
return
}
defer resp.Body.Close()
_, err = copyWithProgress(ctx, file, resp.Body)
if err != nil {
l.ErrorE(err, "download file [%s] from cg failed", tmpFilePath)
return
}
tmpStat, err := os.Lstat(tmpFilePath)
if err != nil {
l.ErrorE(err, "download file [%s] finished, but stat failed, %s", tmpFilePath)
return
}
if tmpStat.Size() != req.FileSize {
l.Error("Failed to download file [%s], file size exception [%d!=%d]", tmpFilePath, req.FileSize, stat.Size())
return
}
err = os.Rename(tmpFilePath, md5FilePath)
if err != nil {
l.ErrorE(err, "Internal error, rename file [%s] to [%s] failed", tmpFilePath, md5FilePath)
return
}
diff_cache.AddCacheRecord(l, cacheFileName)
c.File(md5FilePath)
return
}
func CGDownloadModelTmpFileProgress(c *gin.Context) {
// 只有一个函数, 把中间件校验直接放在此处
if ok, _ := IsCgTokenOK(c.GetHeader(CgTokenKey)); !ok {
c.String(401, "cgToken incorrect")
return
}
l := logger.NewLogger("CGDownloadModel")
var req ModelFileDownloadByAgentRequest
if err := c.ShouldBindJSON(&req); err != nil {
l.ErrorE(err, "request params bind failed")
c.String(400, "request params bind failed")
return
}
cacheFileName := fmt.Sprintf("cg-%s", req.MD5)
md5FilePath := filepath.Join(agent_constant.MigrationOssTmpPath, cacheFileName)
tmpFilePath := fmt.Sprintf("%s.%s.downloading", md5FilePath, req.ContainerID)
stat, err := os.Stat(tmpFilePath)
if err != nil {
c.String(404, "have no tmp file")
return
}
c.String(200, strconv.FormatInt(stat.Size(), 10))
return
}
package agent_cg_tool
import (
"context"
"fmt"
"io"
"kvm_server/pkg-agent/agent_constant"
"strings"
"time"
)
const (
CgTokenKey = "Authorization"
cgTokenPrefix = "cg_token_"
DefaultAgentPort = 22022
)
func IsCgTokenOK(token string) (bool, string) {
if token == "" {
return false, ""
}
uuidWithSalt := string(agent_constant.AesDecode(token))
return strings.HasPrefix(uuidWithSalt, cgTokenPrefix), strings.TrimPrefix(uuidWithSalt, cgTokenPrefix)
}
func DecodeBody(content []byte) []byte {
return agent_constant.AesDecode(string(content))
}
func copyWithProgress(ctx context.Context, dst io.Writer, object io.Reader) (int64, error) {
var err error
var written int64 = 0
buf := make([]byte, 32*1024)
begin := time.Now()
for {
// 读取
select {
case <-ctx.Done():
return written, fmt.Errorf("ctx.Done called, copy canceled, quit now")
default:
nr, er := object.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if ew != nil {
err = ew
return written, err
}
if nw > 0 {
written += int64(nw)
}
if time.Now().After(begin.Add(time.Second * 2)) {
begin = time.Now()
}
if nr != nw {
err = io.ErrShortWrite
return written, err
}
}
if er != nil {
if er == io.EOF {
er = nil
}
return written, er
}
}
}
}
package agent_constant_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestAgentConstant(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "AgentConstant Suite")
}
package agent_constant
const (
AgentAuthorizationHeaderKey = "Authorization"
AgentMachineIDHeaderKey = "MachineID"
)
package agent_constant
import (
"fmt"
uuid "github.com/satori/go.uuid"
"path/filepath"
"strings"
"syscall"
"time"
)
type GPUType string
func (g GPUType) NotFound() bool {
return len(g) == 0
}
const (
NvidiaGPUKey GPUType = "nvidia.com/gpu"
CambriconGPUKey GPUType = "cambricon.com/mlu"
CpuSetGPUKey GPUType = "cpu_set"
)
// 宿主机的docker数据存储目录
func GetStorageRootPath() string {
return "/data"
}
func GetMachineUUIDPath() string {
return filepath.Join(GetStorageRootPath(), CurrentRuntimeVersion.GetMachineUUIDLacation())
}
func GetMachineIntranetIPPath() string {
return filepath.Join(GetStorageRootPath(), ".private_ip")
}
func GetCpuMachineCpuSetInfoPath(cpuCoreNumsPerUnit, maxCpuUnitNum int) string {
if maxCpuUnitNum == 0 {
return filepath.Join(GetStorageRootPath(), fmt.Sprintf(".cpu_machine_cpu_set_%d", cpuCoreNumsPerUnit))
}
return filepath.Join(GetStorageRootPath(), fmt.Sprintf(".cpu_machine_cpu_set_%d_%d", cpuCoreNumsPerUnit, maxCpuUnitNum))
}
func GenerateCpuSetDeviceUUID(begin, end int) string {
// CpuSet-0a5c8099-be77-04d5-b474-779a2ac1948d_0-31
return fmt.Sprintf("CpuSet-%s_%d-%d", uuid.NewV4().String(), begin, end)
}
func ParseCpuSetFromUUID(uuid string) string {
s := strings.Split(uuid, "_")
if len(s) < 2 {
return ""
}
return s[1]
}
const DiskThreshold1 = 0.9
const DiskThreshold2 = 0.95
type MachineHealthStatus int
const (
Normal MachineHealthStatus = 0
StorageMildException MachineHealthStatus = 1
NetworkConnectionException MachineHealthStatus = 2
StorageSeriousException MachineHealthStatus = 3
)
func (m MachineHealthStatus) CanRunInstance() bool {
switch m {
case Normal, StorageMildException:
return true
}
return false
}
const (
QuotaMountPath = "/data/docker"
)
type MachineHeartStatus int
const (
HeartNormal MachineHeartStatus = 0
HeartException MachineHeartStatus = 1
)
type DockerdFeature struct {
// 支持shm和port更新
SupportUpdateShmAndPort bool
// 支持挂载/dev/* 设备, 比如寒武纪的设备
SupportDevDeviceUpdate bool
SupportMountPathPropagationShared bool
// 支持挂载传播,但是当adfs容器先于autodl容器启动时, 会导致挂载点权限为ro, 需要使用chattr辅助
SupportMountPathPropagationSharedButLimit bool
}
var DockerdVersionMapping = map[string]DockerdFeature{
"20.10.7-deviceu": {
SupportUpdateShmAndPort: false,
SupportDevDeviceUpdate: false,
},
"20.10.7-umd7": {
SupportUpdateShmAndPort: false,
SupportDevDeviceUpdate: false,
},
"20.10.7-umd15": {
SupportUpdateShmAndPort: true,
SupportDevDeviceUpdate: false,
},
"20.10.7-umd16": {
SupportUpdateShmAndPort: true,
SupportDevDeviceUpdate: true,
},
"20.10.17-umd17": {
SupportUpdateShmAndPort: true,
SupportDevDeviceUpdate: true,
},
"20.10.17-umd18": {
SupportUpdateShmAndPort: true,
SupportDevDeviceUpdate: true,
SupportMountPathPropagationShared: true,
},
"20.10.17-umd19": {
SupportUpdateShmAndPort: true,
SupportDevDeviceUpdate: true,
SupportMountPathPropagationShared: true,
},
"27.2.0-umd18": {
SupportUpdateShmAndPort: true,
SupportDevDeviceUpdate: true,
SupportMountPathPropagationShared: true,
SupportMountPathPropagationSharedButLimit: true,
},
}
func ChoseAnAvailableDockerdVersion() string {
for k := range DockerdVersionMapping {
return k
}
return ""
}
func GetAllAvailableDockerdVersionList() string {
var tips []string
for k := range DockerdVersionMapping {
tips = append(tips, k)
}
return strings.Join(tips, ", ")
}
func DockerVersionIsValid(dockerdVersion string) bool {
if _, ok := DockerdVersionMapping[dockerdVersion]; ok {
return true
}
return false
}
func DockerVersionSupportUpdateShmAndPort(dockerdVersion string) (support bool, supportVersionTips string) {
var tips []string
for k, v := range DockerdVersionMapping {
if v.SupportUpdateShmAndPort {
tips = append(tips, k)
}
}
supportVersionTips = strings.Join(tips, ", ")
if feat, ok := DockerdVersionMapping[dockerdVersion]; ok {
support = feat.SupportUpdateShmAndPort
}
return
}
func DockerVersionSupportDevDeviceUpdate(dockerdVersion string) (support bool, supportVersionTips string) {
var tips []string
for k, v := range DockerdVersionMapping {
if v.SupportDevDeviceUpdate {
tips = append(tips, k)
}
}
supportVersionTips = strings.Join(tips, ", ")
if feat, ok := DockerdVersionMapping[dockerdVersion]; ok {
support = feat.SupportDevDeviceUpdate
}
return
}
func DockerVersionSupportMountPathPropagationShared(dockerdVersion string) (support, supportButLimit bool, supportVersionTips string) {
var tips []string
for k, v := range DockerdVersionMapping {
if v.SupportMountPathPropagationShared {
tips = append(tips, k)
}
}
supportVersionTips = strings.Join(tips, ", ")
if feat, ok := DockerdVersionMapping[dockerdVersion]; ok {
support = feat.SupportMountPathPropagationShared
supportButLimit = feat.SupportMountPathPropagationSharedButLimit
}
return
}
func MachineRestartTime() (restartTime time.Time) {
sys := syscall.Sysinfo_t{}
err := syscall.Sysinfo(&sys)
if err != nil {
return
}
restartTime = time.Unix(time.Now().Unix()-sys.Uptime, 0)
return
}
const NvidiaDocker = "nvidia"
const DefaultMemory = 200 // 单位MB
const DefaultStorage = 50 // 单位MB
const MachineHeartBeatTime = 30 * time.Second
const MachineRegisterTime = 60 * time.Second
const MachineDiskInfoTime = 50 * time.Second
const MachineStatusConnectTime = 60 * time.Second
const MachineHeartStatusIntervalTime = 10 * time.Second // 机器心跳间隔时间
const MachineIDLength = 10
const MaxTryNum = 3
// agent verbose mode
var VerboseMode bool
package agent_constant
import (
"context"
"fmt"
"github.com/docker/docker/client"
"time"
)
type ContainerSimpleInfoList []ContainerSimpleInfo
func (c ContainerSimpleInfoList) ToMapping() map[ContainerID]ContainerSimpleInfo {
res := make(map[ContainerID]ContainerSimpleInfo)
for i := range c {
res[c[i].ID] = c[i]
}
return res
}
type ContainerSimpleInfo struct {
ID ContainerID
Image string
StatePhase ContainerStatePhase
CreatedAt time.Time
}
const (
ContainerBuiltinLabelKey = "origin.seetaas"
ContainerBuiltinLabelValue = "true"
ContainerV2BuiltinLabelKey = "origin.autodlv2"
)
type RuntimeVersion string
func (v RuntimeVersion) GetMachineUUIDLacation() string {
if v == "v2" {
return ".machine_uuid_v2"
}
return ".machine_uuid"
}
func (v RuntimeVersion) GetContainerBuiltinLabelKey() string {
if v == "v2" {
return ContainerV2BuiltinLabelKey
}
return ContainerBuiltinLabelKey
}
func (v RuntimeVersion) GetApiPort() string {
if v == "v2" {
return "8091"
}
return "8090"
}
var CurrentRuntimeVersion RuntimeVersion = "v1"
func NewDockerClient() (dockerClient DockerClient, err error) {
dockerSocketPath := "unix:///var/run/docker.sock"
dockerClient, err = client.NewClientWithOpts(
client.WithHost(dockerSocketPath),
//client.WithTimeout(time.Second*5),
client.WithVersion("1.41"),
)
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err = dockerClient.Ping(ctx)
return
}
const MigrationNetDisk = "/storage/nas/migrate" // for test
const MigrationOssTmpPath = "/data/oss" //临时存放oss下载或上传的文件
const DockerVolumePath = "/data/docker/volumes"
// MigrateConcurrentLimit MigrationLimitOfUploadingDiff MigrationLimitOfMergingDiff 限制同时迁移的数量
const MigrateConcurrentLimit = 1
const MigrationLimitOfUploadingDiff = 2
const MigrationLimitOfMergingDiff = 1
func NewDiffTemporaryName(sourcePath string, containerUUID ContainerID) string {
return fmt.Sprintf("%s-temporary-%s", sourcePath, containerUUID)
}
package agent_constant
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"strings"
"sync"
)
type ContainerID string
func NewContainerID(id string) ContainerID {
return ContainerID(strings.Trim(id, "/"))
}
func (c ContainerID) String() string {
// 去掉前面的斜杠, "/container_xxx" -> "container_xxx"
return strings.Trim(string(c), "/")
}
func (c ContainerID) AdfsContainerName() string {
// 去掉前面的斜杠, "/container_xxx" -> "container_xxx"
return "adfs-" + c.String()
}
func (c ContainerID) IsAdfsContainerName() bool {
return strings.HasPrefix(string(c), "adfs-")
}
type NewContainerParam struct {
// Basic
ContainerID ContainerID `json:"container_id"`
Image string `json:"image"`
PreCmd []string `json:"pre_cmd"`
// OnlyPreCmd 用来标识容器是否只需要运行pre cmd, 比如一些任务不需要指定root password和port之类的
OnlyPreCmd bool `json:"only_pre_cmd"`
WorkingDir string `json:"working_dir"`
Labels map[string]string `json:"labels"`
Env []string `json:"env"`
// (byte) the amount of data (on disk) that is used for the writable layer of each container.
MaxWritableSizeInByte int64 `json:"max_writable_size_in_byte"`
MaxLocalDiskSizeInByte int64 `json:"max_local_disk_size_in_byte"`
// Timeout (in seconds) to stop a container
StopTimeout *int `json:"stop_timeout"`
// Both in create and start
// (byte) ShmSize
ShmSize int64 `json:"shm_size"`
// Ports ["8080:9999"] means export the port 9999 in container to the host port 8080
Ports []string `json:"ports"`
// root password
RootPassword string `json:"root_password"`
// Proxy server info, ProxyHost 给container使用, proxyHostPublic给前端使用
ProxyHosts []string `json:"proxy_hosts"` // 所有支持的proxy_host,现在最多只会有两个,第二个是热备,为后面方便扩展保存为数组
ProxyHost string `json:"proxy_host"` // 此处已经根据是否有内网做好了判断,region有则使用内网,没有则使用公网
ProxyHostPublic string `json:"proxy_host_public"` // 如果不为空, 则一定是公网地址
ProxyPort string `json:"proxy_port"`
ProxyToken string `json:"proxy_token"`
// service expose ports
JupyterToken string `json:"jupyter_token"`
JupyterPort int `json:"jupyter_port"`
TensorboardPort int `json:"tensorboard_port"`
JupyterDomain string `json:"jupyter_domain"`
TensorboardDomain string `json:"tensorboard_domain"`
SSHPort int `json:"ssh_port"`
// Start required
// (当container使用了gpu时, 一定要指定如下参数) 运行时有可能修改了GPU数量或者index, 控制容器挂载的gpu device list
GPUUUIDList []string `json:"gpu_uuid_list"`
GPUCapabilities []string `json:"gpu_capabilities"`
// CpuLimit 1 means 1cpu core, 0.1 means 0.1cpu core
CpuLimit float64 `json:"cpu_limit"`
// MemLimitInByte
MemLimitInByte int64 `json:"mem_limit_in_byte"`
// ExtraBindingPathList 上层可以额外指定将宿主机的某个目录挂载到容器内
// []string{"/path/in/host:/path/in/container:rw,nocopy"}
ExtraBindingPathList []string `json:"extra_binding_path_list"`
// note: 纯业务逻辑。由上层指定,ExtraBindingPathList中的,哪些Target路径不允许挂载传播
// 解决的问题是,当挂载子目录时,adfs由于无法直接通过挂载容器,将子目录直接挂载到宿主机上
// 只能通过将根目录挂载到宿主机上,再把宿主机上的对应的子目录,直接挂载到容器内。
// agent中的逻辑是,只要dockerd支持挂载传播,就会默认使用,并对source和target目录做一些调整
// 也就是, 正常的挂载传播,要挂的宿主机目录为/storage/uid:/autodl-fs, 用户容器内部,再把/autodl-fs/data -> /root/autodl-fs
// 而子目录,没有下面的一层data目录,也就不可以使用挂载传播
NotAllowMountShareList []string `json:"not_allow_mount_share_list"`
// SSHPublicKey 上层指定的用户公钥,组织好内容,写入 /init/ssh/authorized_keys2中
// agent 不校验数量以及正确性,由server保证。因此此处只需要string类型
SSHPublicKey string `json:"ssh_public_key"`
AutopanelToken string `json:"autopanel_token"`
PreStart []PreStartFunc `json:"pre_start"`
ServicePortProtocol string `json:"service_port_protocol"` // 协议 http,tcp
}
func (p *NewContainerParam) cleanEnv() {
var cleandEnv []string
keyMapping := make(map[string]int)
for _, v := range p.Env {
parse := strings.Split(v, "=")
if len(parse) == 0 {
continue
}
if _, ok := keyMapping[parse[0]]; ok {
continue
}
keyMapping[parse[0]]++
cleandEnv = append(cleandEnv, v)
}
}
func (p *NewContainerParam) SetEnv(key, value string) {
p.cleanEnv()
payload := fmt.Sprintf("%s=%s", key, value)
for i, v := range p.Env {
if strings.HasPrefix(v, key) {
p.Env[i] = payload
return
}
}
p.Env = append(p.Env, payload)
}
type AdditionalPortReflect struct {
HostIP string `json:"host_ip"`
HostPort string `json:"host_port"`
ContainerPort int `json:"container_port"`
Proto string `json:"proto"` // tcp/udp, default: tcp
}
func (p AdditionalPortReflect) Compare(n AdditionalPortReflect) bool {
if n.HostIP == p.HostIP &&
n.HostPort == p.HostPort &&
n.ContainerPort == p.ContainerPort &&
n.Proto == p.Proto {
return true
}
return false
}
// ip:public:private/proto
func (p AdditionalPortReflect) String() string {
if len(p.HostIP) != 0 {
return fmt.Sprintf("%s:%s:%d", p.HostIP, p.HostPort, p.ContainerPort)
}
return fmt.Sprintf("%s:%d", p.HostPort, p.ContainerPort)
}
// StringWithProto : 格式为 ip:public:private/proto
func (p AdditionalPortReflect) StringWithProto() string {
if len(p.Proto) == 0 {
p.Proto = "tcp" // DEFAULT
}
if len(p.HostIP) != 0 {
return fmt.Sprintf("%s:%s:%d/%s", p.HostIP, p.HostPort, p.ContainerPort, p.Proto)
}
return fmt.Sprintf("%s:%d/%s", p.HostPort, p.ContainerPort, p.Proto)
}
type NewContainerRuntimeParam struct {
// Basic required, 可以使用 container id/container name
ContainerID ContainerID `json:"container_id"`
// If the timeout is nil, the container's StopTimeout value is used, if set,
// otherwise the engine default. A negative timeout value can be specified,
// meaning no timeout, i.e. no forceful termination is performed.
StopTimeout *int `json:"stop_timeout"`
// 要执行的命令,如:echo "123"
Cmd string `json:"cmd"`
}
type ContainerExclusiveOption struct {
ContainerID ContainerID
PreRunningMutex sync.Mutex
RunningMutex sync.Mutex
MsgUUID []string
CancelFunc []context.CancelFunc
}
type PreStartCreateADFSContainer struct {
Image string `json:"image"`
Command string `json:"command"`
Privileged bool `json:"privileged"`
Network container.NetworkMode `json:"network"`
// []string{"/path/in/host:/path/in/container:rw,nocopy"}
BindingPathList []string `json:"binding_path_list"`
WorkingDir string `json:"working_dir"`
Env []string `json:"env"`
}
type PreStartFunc struct {
CreateContainer *PreStartCreateADFSContainer `json:"create_container"`
}
package agent_constant
import (
"encoding/json"
"time"
)
type ContainerStatus struct {
// Basic
ContainerID ContainerID `json:"container_id"`
Status ContainerState `json:""`
// 此条信息的生产日期
ValidAt time.Time `json:""`
RestartCount int `json:""`
CreatedAt time.Time `json:""`
StartedAt time.Time `json:""`
FinishedAt time.Time `json:""`
Device []ContainerDevice `json:""`
// sentTimes 标识该消息被发送给server的次数, 用于保证消息的冗余可靠性, <=3才发送
sentTimes int
sentAt time.Time
}
type ContainerDevice struct {
Driver string `json:"driver"`
DeviceIDs []string `json:"device_ids"`
}
func (c *ContainerStatus) String() (string, error) {
out, err := json.Marshal(c)
return string(out), err
}
// Abstruct 用于比对两个对象是否相等的摘要信息, 剔除掉无关信息的序列化结果
func (c *ContainerStatus) Abstruct() string {
out, _ := json.Marshal(&ContainerStatus{
ContainerID: c.ContainerID,
Status: c.Status,
RestartCount: c.RestartCount,
CreatedAt: c.CreatedAt,
StartedAt: c.StartedAt,
FinishedAt: c.FinishedAt,
})
return string(out)
}
// send操作间隔会线性增长, 在状态发生后的第0, 30s后继续发送
func (c *ContainerStatus) CannotSend() bool {
return c.sentTimes > 0
}
func (c *ContainerStatus) BeforeDoSend() {
c.sentTimes++
}
func (c *ContainerStatus) IsFirstSend() bool {
return c.sentTimes == 0
}
type ContainerState struct {
Status ContainerStatePhase `json:"status"`
OOMKilled bool `json:"oom_killed"`
Pid int `json:"pid"`
ExitCode int `json:"exit_code"`
Error string `json:"error"`
}
type UploadOssInfo struct {
ContainerID ContainerID `json:"container_id"`
ImageUUID string `json:"image_uuid"`
Finished bool `json:"finished"`
ObjectSize int64 `json:"object_size"`
ImageSize int64 `json:"image_size"`
Progress float64 `json:"progress"`
Error string `json:"error"`
IsUploadNew bool `json:"is_upload_new"`
}
func (u *UploadOssInfo) String() (string, error) {
out, err := json.Marshal(u)
return string(out), err
}
func (u *UploadOssInfo) ParseFromString(in string) error {
err := json.Unmarshal([]byte(in), u)
if err != nil {
return err
}
return err
}
type CancelUploadImageInfo struct {
ContainerID ContainerID `json:"container_id"`
ImageUUID string `json:"image_uuid"`
Finished bool `json:"finished"`
Error string `json:"error"`
}
func (u *CancelUploadImageInfo) String() (string, error) {
out, err := json.Marshal(u)
return string(out), err
}
func (u *CancelUploadImageInfo) ParseFromString(in string) error {
err := json.Unmarshal([]byte(in), u)
if err != nil {
return err
}
return err
}
type BasicUsageInfo struct {
// 百分数, 91.56 = 91.56%, 保留了小数点后两位
CPUUsagePercent float64 `json:"cpu_usage_percent"`
MemUsagePercent float64 `json:"mem_usage_percent"`
// MemUsage (byte)
MemUsage int64 `json:"mem_usage"`
// MemLimit (byte)
MemLimit int64 `json:"mem_limit"`
// RootFSUsedSize (byte), 容器写入到rootfs的空间大小, 其上限是rootfs.size_limit
// server/pkg-agent/agent_constant/container_control.go:20 MaxWritableSizeInByte
// The size of files that have been created or changed by this container.
RootFSUsedSize int64 `json:"root_fs_used_size"`
// RootFSTotalSize (byte), 容器整个rootfs空间大小, 包含镜像内容+容器创建内容
RootFSTotalSize int64 `json:"root_fs_total_size"`
// 获取数据盘大小
DataDiskTotalSize uint64 `json:"data_disk_total_size"`
DataDiskUsedSize uint64 `json:"data_disk_used_size"`
StorageFSUsage string `json:"storage_fs_usage"`
}
type ContainerUsage struct {
// Basic
ContainerID ContainerID `json:"container_id"`
// 此条信息的生产日期
ValidAt time.Time `json:"valid_at"`
BasicUsageInfo
// 拉取镜像的progress, [0, 1], 0.95 -> 95%
PullImageProgress float64 `json:"pull_image_progress"`
DownloadImageProgress float64 `json:"download_image_progress"`
DownloadOssFileProgress float64 `json:"download_oss_file_progress"`
// 标识需要发送的消息
IsUsageNew bool `json:"is_new"`
}
func (c *ContainerUsage) String() (string, error) {
out, err := json.Marshal(c)
return string(out), err
}
// Abstract 用于比对两个对象是否相等的摘要信息, 剔除掉无关信息的序列化结果
func (c *ContainerUsage) Abstract() string {
out, _ := json.Marshal(&ContainerUsage{
ContainerID: c.ContainerID,
BasicUsageInfo: BasicUsageInfo{
CPUUsagePercent: c.CPUUsagePercent,
MemUsagePercent: c.MemUsagePercent,
MemUsage: c.MemUsage,
MemLimit: c.MemLimit,
RootFSUsedSize: c.RootFSUsedSize,
RootFSTotalSize: c.RootFSTotalSize,
DataDiskUsedSize: c.DataDiskUsedSize,
DataDiskTotalSize: c.DataDiskTotalSize,
StorageFSUsage: c.StorageFSUsage,
},
DownloadImageProgress: c.DownloadImageProgress,
})
return string(out)
}
type ContainerPrepareMigrate struct {
SourceContainerID ContainerID `json:"source_container_id"`
UploadPath string `json:"upload_path"`
}
func (c *ContainerPrepareMigrate) String() (string, error) {
out, err := json.Marshal(c)
return string(out), err
}
type ContainerPerformMigrate struct {
// Basic
SourceContainerID ContainerID `json:"source_container_id"`
TargetContainerID ContainerID `json:"target_container_id"`
DownloadPath string `json:"download_path"`
MinioBucketInfo *MinioBucketInfo
// 此条信息的生产日期
ValidAt time.Time `json:"valid_at"`
Msg string `json:"msg"`
}
func (c *ContainerPerformMigrate) String() (string, error) {
out, err := json.Marshal(c)
return string(out), err
}
type ContainerStatePhase string
const (
Pulling ContainerStatePhase = "pulling" // 人为定义的状态, 由 agent 手动发送, 用于表明在拉镜像, 除此之外都是 docker 容器状态.
Pulled ContainerStatePhase = "pulled" // 人为定义的状态, 由 agent 手动发送, 用于表明镜像完成
Created ContainerStatePhase = "created"
Running ContainerStatePhase = "running"
Paused ContainerStatePhase = "paused"
Restarting ContainerStatePhase = "restarting"
Removing ContainerStatePhase = "removing"
Exited ContainerStatePhase = "exited"
Dead ContainerStatePhase = "dead"
NotFound ContainerStatePhase = "not_found"
)
package agent_constant
const (
HeaderAuthName = "Authorization"
HeaderFileSize = "FileSize"
HeaderAuthContent = "!!!autodl666!!!"
DefaultCopyNumbers = 3
FakeIntranetIP = "fakeIntranetIp1111111111"
)
package agent_constant
import (
"math"
"syscall"
)
// DiskFree packages the same types of information one gets from the GNU Linux "df" cmd
type DiskFree struct {
total float64
used float64
avail float64
percentUsed int
}
type DiskInfo struct {
NFSMount string `json:"nfs_mount"`
DiskTotal uint64 `json:"disk_total"` // 单位byte
DiskUsed uint64 `json:"disk_used"` // 单位byte
DiskAvailable uint64 `json:"disk_available"` // 单位byte
}
// size provides a specific type for data size category conversion
type size int64
// Exported constants for file size calculations
const (
B size = 1
KB = 1024 * B // Kilobyte
MB = KB * KB // Megabyte
GB = MB * KB // Gigabyte
TB = GB * KB // Terabyte
PB = TB * KB // Petabyte
)
// NewDf creates a DiskFree struct for the specified mount point. One can then use the
// struct methods to get the appropriate values in whatever size increment they wish.
// Values are stored as float64s in order to allow for a reasonable degree of accuracy when
// dealing with large value numbers i.e 1.7GB, 3.4TB etc...
func NewDf(mountPoint string) (f DiskFree, err error) {
s := syscall.Statfs_t{}
if err = syscall.Statfs(mountPoint, &s); err != nil {
return f, err
}
f = DiskFree{
total: float64(s.Blocks) * float64(s.Bsize),
used: float64(s.Blocks-s.Bfree) * float64(s.Bsize),
}
f.avail = f.total - f.used
f.percentUsed = int(math.Ceil(f.used / f.total * 100))
return f, err
}
// Total takes a size (df.MB, df.GB etc) and returns the amount of total space
//
// Example:
//
// d, _ := df.NewDf("/mnt/fs")
// fmt.Println(df.Total(df.GB))
func (df DiskFree) Total(s size) float64 {
return df.total / float64(s)
}
// Used takes a size (df.MB, df.GB etc) and returns the amount of space used.
//
// Example:
//
// d, _ := df.NewDf("/mnt/fs")
// fmt.Println(df.Used(df.GB))
func (df DiskFree) Used(s size) float64 {
return df.used / float64(s)
}
// Avail takes a size (df.MB, df.GB etc) and returns the amount of space available.
//
// Example:
//
// d, _ := df.NewDf("/mnt/fs")
// fmt.Println(df.Avail(df.GB))
func (df DiskFree) Avail(s size) float64 {
return df.avail / float64(s)
}
// PercentUsed returns the percentage of space used as an int
func (df DiskFree) PercentUsed() int {
return df.percentUsed
}
type Unit string
const (
TypeB Unit = "B"
TypeKB Unit = "KB"
TypeMB Unit = "MB"
TypeGB Unit = "GB"
TypeTB Unit = "TB"
TypePB Unit = "PB"
)
type IntegerType string
const FloorType IntegerType = "floor"
const CeilType IntegerType = "ceil"
func ToSize(byteValue int64, unit Unit, minValue int64, intType IntegerType) (value int64) {
var valueFloat float64
switch unit {
case TypeB:
valueFloat = float64(byteValue)
case TypeKB:
valueFloat = float64(byteValue) / float64(KB)
case TypeMB:
valueFloat = float64(byteValue) / float64(MB)
case TypeGB:
valueFloat = float64(byteValue) / float64(GB)
case TypeTB:
valueFloat = float64(byteValue) / float64(TB)
case TypePB:
valueFloat = float64(byteValue) / float64(PB)
default:
valueFloat = float64(minValue)
}
if intType == FloorType {
valueFloat = math.Floor(valueFloat)
} else {
valueFloat = math.Ceil(valueFloat)
}
if int64(valueFloat) < minValue {
return minValue
}
return int64(valueFloat)
}
func ToByte(value int64, unit Unit) int64 {
switch unit {
case TypeB:
return value
case TypeKB:
return value * int64(KB)
case TypeMB:
return value * int64(MB)
case TypeGB:
return value * int64(GB)
case TypeTB:
return value * int64(TB)
case TypePB:
return value * int64(PB)
default:
return value
}
}
package agent_constant
import (
"context"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
volumetypes "github.com/docker/docker/api/types/volume"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"io"
"time"
)
type DockerClient interface {
Info(ctx context.Context) (types.Info, error)
Ping(ctx context.Context) (types.Ping, error)
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
ContainerInspectWithRaw(ctx context.Context, containerID string, getSize bool) (types.ContainerJSON, []byte, error)
ContainerStatsOneShot(ctx context.Context, containerID string) (types.ContainerStats, error)
ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *specs.Platform, containerName string) (container.ContainerCreateCreatedBody, error)
ContainerUpdate(ctx context.Context, containerID string, updateConfig container.UpdateConfig) (container.ContainerUpdateOKBody, error)
ContainerStart(ctx context.Context, containerID string, options types.ContainerStartOptions) error
ContainerStop(ctx context.Context, containerID string, timeout *time.Duration) error
ContainerRemove(ctx context.Context, containerID string, options types.ContainerRemoveOptions) error
ContainerExport(ctx context.Context, containerID string) (io.ReadCloser, error)
ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ContainerExecCreate(ctx context.Context, containerID string, config types.ExecConfig) (types.IDResponse, error)
ContainerExecStart(ctx context.Context, execID string, config types.ExecStartCheck) error
ImagePull(ctx context.Context, refStr string, options types.ImagePullOptions) (io.ReadCloser, error)
ImageList(ctx context.Context, options types.ImageListOptions) ([]types.ImageSummary, error)
ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error)
VolumeRemove(ctx context.Context, volumeID string, force bool) error
VolumeCreate(ctx context.Context, options volumetypes.VolumeCreateBody) (types.Volume, error)
VolumeInspect(ctx context.Context, volumeID string) (types.Volume, error)
}
type DockerSpecifiedDeviceDriverType string
func (t DockerSpecifiedDeviceDriverType) String() string {
return string(t)
}
const (
DriverTypeOfShmSize DockerSpecifiedDeviceDriverType = "autodl-shm-size"
DriverTypeOfPorts DockerSpecifiedDeviceDriverType = "autodl-ports"
)
package agent_constant
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/sha1"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"io"
"strings"
)
var encryptKey string = "!!!---AutoDL666---!!!"
func getEncoding() *base64.Encoding {
var autodlEncodeStd = "+BCEFGHIJK-NPQRSVXWYZ0123456789bcefghijk*npqrsvxwyz/"
var salt = "autodl"
return base64.NewEncoding(fmt.Sprintf("%s%s%s", salt, strings.ToUpper(salt), autodlEncodeStd)).WithPadding(base64.NoPadding)
}
func Encode(input string) (eStr string, err error) {
buff := &bytes.Buffer{}
encoder := base64.NewEncoder(getEncoding(), buff)
_, err = encoder.Write([]byte(input))
if err != nil {
err = errors.Wrap(err, "encode failed")
return
}
err = encoder.Close()
eStr = buff.String()
return
}
func Decode(input string) (dStr string, err error) {
buff := &bytes.Buffer{}
decoder := base64.NewDecoder(getEncoding(), strings.NewReader(input))
_, err = io.Copy(buff, decoder)
dStr = buff.String()
return
}
func PKCS5Padding(ciphertext []byte, blockSize int) []byte {
padding := blockSize - len(ciphertext)%blockSize
padtext := bytes.Repeat([]byte{byte(padding)}, padding)
return append(ciphertext, padtext...)
}
func ZeroUnPadding(origData []byte) []byte {
length := len(origData)
// 去掉最后一个字节 unpadding 次
unpadding := int(origData[length-1])
return origData[:(length - unpadding)]
}
func getAesBlk(key string) (cipher.Block, []byte) {
key = fmt.Sprintf("autodlBlk%sAutoDLBlk", key)
sha1Key := sha1.Sum([]byte(key))
var AESKey []byte
AESKey = append(AESKey, sha1Key[:]...)
AESKey = append(AESKey, sha1Key[:12]...)
blk, err := aes.NewCipher(AESKey)
if err != nil {
panic(fmt.Sprintf("failed to get DES blk of %s, len: %d, %s", key, len(key), err.Error()))
}
return blk, AESKey[:16]
}
func AesEncode(val []byte) (res []byte) {
if len(val) == 0 {
return
}
blk, iv := getAesBlk(encryptKey)
encodeBytes := PKCS5Padding(val, blk.BlockSize())
buff := make([]byte, len(encodeBytes))
cryped := cipher.NewCBCEncrypter(blk, iv)
cryped.CryptBlocks(buff, encodeBytes)
return []byte(getEncoding().EncodeToString(buff))
}
func AesDecode(val string) []byte {
defer func() {
if errR := recover(); errR != nil {
return
}
}()
decodeBytes, err := getEncoding().DecodeString(val)
if err != nil {
return []byte{}
}
if len(decodeBytes) == 0 {
return []byte{}
}
blk, iv := getAesBlk(encryptKey)
buff := make([]byte, len(decodeBytes))
cryped := cipher.NewCBCDecrypter(blk, iv)
cryped.CryptBlocks(buff, decodeBytes)
jsonStr := ZeroUnPadding(buff)
return jsonStr
}
type CommonInfo struct {
ServerAddr string `json:"server_addr"`
ServerPort string `json:"server_port"`
Token string `json:"token"`
}
type serviceInfo struct {
// Name string `json:"name"`
// tcp
Type string `json:"type"`
LocalIP string `json:"local_ip"`
LocalPort int `json:"local_port"`
RemotePort int `json:"remote_port"`
BandwidthLimit string `json:"bandwidth_limit"`
// https
CustomerDomains string `json:"customer_domains"`
Plugin string `json:"plugin"`
PluginLocalAddr string `json:"plugin_local_addr"`
PluginHostHeaderRewrite string `json:"plugin_host_header_rewrite"`
PluginHeaderXFromWhere string `json:"plugin_header_x_from_where"`
}
func ProxyConfigEncryptCommon(addr, port, token string) string {
// 加密 [common]
c := &CommonInfo{
ServerAddr: addr,
ServerPort: port,
Token: token,
}
cj, _ := json.Marshal(c)
return string(AesEncode(cj))
}
func ProxyConfigDecodeCommon(encodeStr string) (common *CommonInfo, err error) {
if len(encodeStr) == 0 {
return nil, errors.New("input is empty")
}
// 解密 [common]
jsonStr := AesDecode(encodeStr)
if len(jsonStr) == 0 {
err = errors.New("decode str is empty")
return
}
common = &CommonInfo{}
err = json.Unmarshal(jsonStr, &common)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("unmarshal common info failed,string:%s", string(jsonStr)))
}
return
}
func ProxyConfigEncryptServiceList(sshPort int, jupyterPort int, jupyterDomain string, tensorboardPort int, tensorboardDomain string, hostName, cpuName string) string {
serviceList := map[string]serviceInfo{
fmt.Sprintf("ssh-%d", sshPort): {
Type: "tcp",
LocalIP: "127.0.0.1",
LocalPort: 22,
RemotePort: sshPort,
BandwidthLimit: "50MB",
},
"check-host": {
Type: hostName,
},
"check-cpu": {
Type: cpuName,
},
}
if jupyterDomain != "" {
js := strings.Split(jupyterDomain, ".")
//serviceList[fmt.Sprintf("autopanel-https2http-%s", js[0])] = serviceInfo{
// Type: "https",
// CustomerDomains: jupyterDomain,
// Plugin: "https2http",
// PluginLocalAddr: "127.0.0.1:2022",
// PluginHostHeaderRewrite: "127.0.0.1",
// PluginHeaderXFromWhere: "autodl",
// BandwidthLimit: "50MB",
//}
//serviceList[fmt.Sprintf("user-define-https2http-%s", ts[0])] = serviceInfo{
// Type: "https",
// CustomerDomains: tensorboardDomain,
// Plugin: "https2http",
// PluginLocalAddr: "127.0.0.1:6006",
// PluginHostHeaderRewrite: "127.0.0.1",
// PluginHeaderXFromWhere: "autodl",
// BandwidthLimit: "50MB",
//}
serviceList[fmt.Sprintf("autopanel-https-%s", js[0])] = serviceInfo{
Type: "http",
CustomerDomains: jupyterDomain,
LocalPort: 2022,
BandwidthLimit: "50MB",
}
} else {
serviceList[fmt.Sprintf("autopanel-%d", jupyterPort)] = serviceInfo{
Type: "tcp",
LocalIP: "127.0.0.1",
LocalPort: 2022,
RemotePort: jupyterPort,
BandwidthLimit: "50MB",
}
}
if tensorboardDomain != "" {
ts := strings.Split(tensorboardDomain, ".")
serviceList[fmt.Sprintf("user-define-https-%s", ts[0])] = serviceInfo{
Type: "http",
LocalPort: 6006,
CustomerDomains: tensorboardDomain,
BandwidthLimit: "50MB",
}
} else {
serviceList[fmt.Sprintf("user-define-%d", tensorboardPort)] = serviceInfo{
Type: "tcp",
LocalIP: "127.0.0.1",
LocalPort: 6006,
RemotePort: tensorboardPort,
BandwidthLimit: "50MB",
}
}
cj, _ := json.Marshal(serviceList)
return string(AesEncode(cj))
}
// 解密 [autodl-services]
package agent_constant
import "testing"
func TestCommon(t *testing.T) {
common := ProxyConfigEncryptCommon("192.168.65.48", "10086", "qwertyuiop")
t.Log(common)
//list := ProxyConfigEncryptServiceList(10087, 10088, 10089, "hostname", "cpu_name", "", "")
//t.Log(list)
}
func TestProxyConfigDecodeCommon(t *testing.T) {
c, err := ProxyConfigDecodeCommon("IVnT5DQRhT04810OxrVPh*FECuKPF0NlH15rBZObxF-Jq0z47sI6/L-Kpv45d0LR9Qc6yD3b+xusz3o7FjbQL9hHkpVI1-Qf+Lk0tvrGJoF")
if err != nil {
t.Log(err)
return
}
t.Log(c)
}
package agent_constant
import (
"fmt"
"golang.org/x/exp/slices"
)
const (
// RuntimeEnv 表明当前ga运行于何种环境, one of ["direct", "docker-compose", "k8s"]
RuntimeEnv = "GA_RUNTIME"
InitDirectoryLocation = "/init"
StorageDirectoryLocation = "/root/autodl-tmp"
// TODO: 路径待确认
NetDiskInDockerMount = "/root/autodl-nas" // 容器内用户看到的路经
NetDiskQuotaWarningOnMachineDir = "/storage/nas/1410065407-nfs-warning" // project id 最大为 1410065407
NetDiskOnMachineMountDir = "/storage/nas"
NetDiskOnMachineMountFmt = NetDiskOnMachineMountDir + "/%d" // '/storage/%d' gpu 机器上本地挂载的路径. e.g. /storage/2
CommonDataInContainerMount = "/root/autodl-pub"
CommonDataOnMachineMountDir = "/storage/nas/1"
/*
adfs公共数据集位于宿主机/data/adfs/pub/data, 由运维运行挂载
docker run -d --name=adfs-pub --privileged --network host -v /tmp/weed_cache:/cache -v /data/adfs/pub:/storage:shared -w /root registry.cn-beijing.aliyuncs.com/codewithgpu/seaweedfs:3.14sr bash -c "bash /root/mount.sh quota-1 100.78.114.13:8888,100.78.114.14:8888,100.78.114.15:8888 6 10TiB 3000000"
adfs用户目录位于宿主机/data/adfs/{uid}/data,由agent启动adfs mount容器,执行shared挂载
*/
ADFSOnMachineMountDir = "/data/adfs"
CommonADFSDataOnMachineMountDir = ADFSOnMachineMountDir + "/pub/data"
ADFSOnMachineMountFmt = ADFSOnMachineMountDir + "/%d-%s"
ADFSInDockerMount = "/root/autodl-fs"
ADFSOnMachineCachePath = "/storage/weed_cache"
ADFSOnMachineCacheFmt = ADFSOnMachineCachePath + "/%d-%s"
ExclusiveNfsOnMachineMountFmt = "/data/adfs/nfs/%d-%s"
// 实际需要/var/tmp/OptixCache_root,但是挂载时目录不存在会报错,所以挂载/var/tmp
RenderOptixCacheOnMachineDir = "/var/tmp"
RenderOptixCacheInContainerMount = "/var/tmp"
//nfs,adfs,autofs image_name
Exclusivenfs = "registry.cn-beijing.aliyuncs.com/codewithgpu2/exclusivenfs:1.0"
ExclusivenfsV2 = "registry.cn-beijing.aliyuncs.com/codewithgpu2/exclusivenfs:2.0" // 新镜像
Seaweedfs = "registry.cn-beijing.aliyuncs.com/codewithgpu/seaweedfs:3.34ss"
Autofs = "registry.cn-beijing.aliyuncs.com/codewithgpu/autofs:1.04"
)
func GetUIDNetDiskMountInfo(regionSign string, uid int) string {
if slices.Contains([]string{
"neimeng-A",
"neimeng-C",
"neimeng-D",
}, regionSign) && slices.Contains([]int{
14,
44609,
}, uid) {
return fmt.Sprintf("%s-2/%d", NetDiskOnMachineMountDir, uid)
}
return fmt.Sprintf(NetDiskOnMachineMountFmt, uid)
}
package agent_constant
import "github.com/pkg/errors"
var (
EchoMessageSkippedError = errors.New("this msg contain 'response_from_agent_flag', do not write response into socket pipe")
)
package agent_constant
import (
"fmt"
"github.com/pkg/errors"
"libvirt.org/go/libvirt"
libvirtxml "libvirt.org/go/libvirtxml"
"strings"
)
type QemuClient interface {
CreateVM() error
StartVM() error
StopVM() error
DestroyVM() error
GetAllDomainStats() ([]libvirt.DomainStats,error)
ListAllDomains(vName string)
}
const (
vmName = "windows-gaming-vm" // 虚拟机名称
vcpu = 8 // 虚拟CPU核心数
memoryMB = 16384 // 内存大小(MB)
diskImagePath = "/path/to/disk.qcow2" // 虚拟机磁盘镜像路径
gpuPCIAddress = "0000:01:00.0" // GPU的PCI地址(通过 `lspci` 命令查看)
)
type Client struct {
connect *libvirt.Connect
}
func NewClient() (QemuClient, error) {
// 连接到 libvirt 守护进程
conn, err := libvirt.NewConnect("qemu:///system")
if err != nil {
fmt.Printf("Failed to connect to libvirt: %v \n", err)
return nil, err
}
return &Client{connect: conn}, nil
}
// 生成虚拟机 XML 配置(含 GPU 直通)
func generateVMXML() string {
domainCfg := &libvirtxml.Domain{
Type: "kvm",
Name: vmName,
Description: "My test VM",
Memory: &libvirtxml.DomainMemory{
Unit: "MiB",
Value: memoryMB,
},
VCPU: &libvirtxml.DomainVCPU{
Value: vcpu,
},
OS: &libvirtxml.DomainOS{
Type: &libvirtxml.DomainOSType{
Type: "hvm",
Arch: "x86_64",
Machine: "pc",
},
BootDevices: []libvirtxml.DomainBootDevice{
{Dev: "hd"},
},
},
Devices: &libvirtxml.DomainDeviceList{
Disks: []libvirtxml.DomainDisk{
{
Device: "disk",
Driver: &libvirtxml.DomainDiskDriver{Type: "qcow2"},
Source: &libvirtxml.DomainDiskSource{
File: &libvirtxml.DomainDiskSourceFile{File: diskImagePath},
},
Target: &libvirtxml.DomainDiskTarget{Dev: "vda", Bus: "virtio"},
},
},
Interfaces: []libvirtxml.DomainInterface{
{
Model: &libvirtxml.DomainInterfaceModel{Type: "virtio"},
Source: &libvirtxml.DomainInterfaceSource{Network: &libvirtxml.DomainInterfaceSourceNetwork{Network: "default"}},
},
},
Hostdevs: []libvirtxml.DomainHostdev{
{
Mode: "subsystem",
Type: "pci",
Managed: "yes",
Source: &libvirtxml.DomainHostdevSource{
Address: &libvirtxml.DomainAddress{
PCI: &libvirtxml.DomainAddressPCI{
Domain: parsePCIComponent(gpuPCIAddress, 0),
Bus: parsePCIComponent(gpuPCIAddress, 1),
Slot: parsePCIComponent(gpuPCIAddress, 2),
Function: parsePCIComponent(gpuPCIAddress, 3),
},
},
},
SubsysPCI: &libvirtxml.DomainHostdevSubsysPCI{
Add
},
},
},
},
}
xml, _ := domainCfg.Marshal()
return xml
}
// 解析PCI地址组件(例如 "0000:01:00.0" -> 各部分转换为十六进制)
func parsePCIComponent(addr string, index int) string {
components := strings.Split(addr, ":")
if len(components) < 3 {
return "0x00"
}
slotFunc := strings.Split(components[2], ".")
if index == 3 && len(slotFunc) > 1 {
return "0x" + slotFunc[1]
}
switch index {
case 0:
return "0x" + components[0]
case 1:
return "0x" + components[1]
case 2:
return "0x" + slotFunc[0]
default:
return "0x00"
}
}
// 创建虚拟机
func (c Client) CreateVM() error {
xml := generateVMXML()
dom, err := c.connect.DomainDefineXML(xml)
if err != nil {
return errors.Wrap(err, "Failed to define domain")
}
// 修改CPU核心数
err = dom.SetVcpusFlags(16, libvirt.DOMAIN_VCPU_CONFIG)
if err != nil {
return err
}
// 修改内存大小(单位 KiB)
err = dom.SetMemoryFlags(32768*1024, libvirt.DOMAIN_MEM_CONFIG)
if err != nil {
return err
}
fmt.Printf("Virtual machine %s created\n", vmName)
return nil
}
// 启动虚拟机
func (c Client) StartVM() error {
dom, err := c.connect.LookupDomainByName(vmName)
if err != nil {
return errors.Wrap(err, "Failed to find domain")
}
defer dom.Free()
if err := dom.Create(); err != nil {
return errors.Wrap(err, "Failed to start domain")
}
fmt.Printf("Virtual machine %s started\n", vmName)
return nil
}
// 停止虚拟机
func (c Client) StopVM() error {
dom, err := c.connect.LookupDomainByName(vmName)
if err != nil {
return errors.Wrap(err, "Failed to find domain")
}
defer dom.Free()
if err := dom.Shutdown(); err != nil {
return errors.Wrap(err, "Failed to stop domain")
}
fmt.Printf("Virtual machine %s stopped\n", vmName)
return nil
}
// 销毁虚拟机(删除配置)
func (c Client) DestroyVM() error {
dom, err := c.connect.LookupDomainByName(vmName)
if err != nil {
return errors.Wrap(err, "Failed to find domain")
}
defer dom.Free()
if err := dom.Undefine(); err != nil {
return errors.Wrap(err, "Failed to destroy domain")
}
fmt.Printf("Virtual machine %s destroyed\n", vmName)
return nil
}
// 通过虚拟机名称获取其全部状态
func (c Client) GetAllDomainStats() ([]libvirt.DomainStats,error) {
dom, err := c.connect.LookupDomainByName(vmName)
if err != nil {
return nil,errors.Wrap(err, "Failed to find domain")
}
defer func(dom *libvirt.Domain) {
err := dom.Free()
if err != nil {
fmt.Printf("err:%v \n",err)
}
}(dom)
dobs := make([]*libvirt.Domain, 0)
dstats, err := c.connect.GetAllDomainStats(dobs, 0x3FE, libvirt.CONNECT_GET_ALL_DOMAINS_STATS_ACTIVE)
if err != nil {
fmt.Println("err", err)
return nil, err
}
return dstats, nil
}
// 获取所有开启的虚拟机,active
func (c Client) ListAllDomains(vName string) {
doms, err := c.connect.ListAllDomains(libvirt.CONNECT_LIST_DOMAINS_ACTIVE)
if err != nil {
fmt.Println("err", err)
}
fmt.Printf("%d running domains:\n", len(doms))
for _, dom := range doms {
name, err := dom.GetName()
if err == nil {
fmt.Printf(" %s\n", name)
}
if name != vName {
continue
}
info, err := dom.GetInfo()
fmt.Println("GetInfo", info, err)
//内存利用率
// get tag 4:剩余 & 5:总共
meminfo, err := dom.MemoryStats(10, 0)
fmt.Println("MemoryStats", meminfo, err)
//磁盘利用率-错误值(都是最大值)
diskinfo, err := dom.GetBlockInfo("hda", 0)
fmt.Println("DiskStats", diskinfo, err)
blockinfo, err := dom.BlockStats("hda")
fmt.Println("BlockStats", blockinfo, err)
//go版本不支持
guestinfo, err := dom.GetGuestInfo(libvirt.DOMAIN_GUEST_INFO_DISKS, 0)
fmt.Println("GetGuestInfo", guestinfo, err)
//dom.GetCPUStats()
dom.Free()
}
}
package agent_constant
import (
"encoding/json"
"fmt"
"strings"
"time"
)
type MachineHardwareInfo struct {
MachineName string `json:"machine_name"`
MachineID string `json:"machine_id"`
AgentVersion string `json:"agent_version"`
AgentFeature AgentFeature `json:"agent_feature"`
KernelVersion string `json:"kernel_version"`
DockerVersion string `json:"docker_version"`
NvidiaDocker bool `json:"nvidia_docker"`
GPUType string `json:"gpu_type"`
GPUName string `json:"gpu_name"`
GPUCount int64 `json:"gpu_count"`
GpuMemory int64 `json:"gpu_memory"`
GpuUsed int64 `json:"gpu_used"`
MEMCountInMB int64 `json:"mem_count_in_mb"`
CpuNum int64 `json:"cpu_num"`
CpuName string `json:"cpu_name"`
CpuBasicFrequency string `json:"cpu_basic_frequency"`
Memory int64 `json:"memory"`
DiskSize int64 `json:"disk_size"` //单位为byte
HealthStatus MachineHealthStatus `json:"health_status"`
RegionName string `json:"region_name"`
IpAddress string `json:"ip_address"`
OsName string `json:"os_name"`
RestartTime time.Time `json:"restart_time"`
GpuOccupation []*GpuOccupationInfo `json:"gpu_occupation"`
// 此条信息的生产日期
ValidAt time.Time `json:"valid_at"`
RegionSign string `json:"region_sign"`
}
type MachineHealthInfo struct {
MachineID string `json:"machine_id"`
HealthStatus MachineHealthStatus `json:"health_status"`
DiskInfo *DiskInfo `json:"disk_info"`
// 此条信息的生产日期
ValidAt time.Time `json:"valid_at"`
}
func (c *MachineHealthInfo) String() (string, error) {
out, err := json.Marshal(c)
return string(out), err
}
func (c *MachineHardwareInfo) String() (string, error) {
out, err := json.Marshal(c)
return string(out), err
}
type GpuOccupationInfo struct {
GpuName string `json:"gpu_name"`
GpuUUID string `json:"gpu_uuid"`
Index int `json:"index"`
Used int `json:"used"`
}
type AgentFeature string
var featureOfAutoPanel = "autopanel"
var features = []string{
featureOfAutoPanel,
}
func NewFeatureDesc(cudaV, driverV string) AgentFeature {
if cudaV != "" {
UpdateOrAppendFeature(fmt.Sprintf("cuda_version=%s", cudaV))
}
if driverV != "" {
UpdateOrAppendFeature(fmt.Sprintf("driver_version=%s", driverV))
}
return AgentFeature(strings.Join(features, ","))
}
func (a AgentFeature) HasAutoPanel() bool {
return strings.Contains(string(a), featureOfAutoPanel)
}
func (a AgentFeature) HasCudaVersion() bool {
targetField := "cuda_version="
return strings.Contains(string(a), targetField) && a.GetCudaVersion() != ""
}
func (a AgentFeature) GetCudaVersion() string {
fields := strings.Split(string(a), ",")
for _, field := range fields {
if strings.HasPrefix(field, "cuda_version=") {
return strings.TrimPrefix(field, "cuda_version=")
}
}
// 如果是空值,返回 99 。一般是CPU机器
return "99"
}
func (a AgentFeature) GetDriverVersion() string {
fields := strings.Split(string(a), ",")
for _, field := range fields {
if strings.HasPrefix(field, "driver_version=") {
return strings.TrimPrefix(field, "driver_version=")
}
}
return ""
}
func UpdateOrAppendFeature(feature string) {
for i, f := range features {
if strings.HasPrefix(f, strings.Split(feature, "=")[0]) {
features[i] = feature
return
}
}
features = append(features, feature)
}
package agent_constant
import (
"encoding/json"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"
"strings"
)
type ContainerCreateParam struct {
NewContainerParam
ProgressHook func(pullProgress, downloadProgress float64) `json:"-"`
DownloadPrivateImageInfo *DownloadPrivateImageInfo `json:"download_private_image_info"`
}
type ContainerRuntimeParam struct {
NewContainerRuntimeParam
}
type ContainerCreateAndRuntimeParam struct {
Param ContainerCreateParam
RuntimeParam ContainerRuntimeParam
}
type ContainerOperateResponse struct {
ContainerID ContainerID `json:"container_id"`
Code string `json:"code"`
ErrMsg string `json:"err_msg"`
// container在create之后, 会返回docker_container_uuid, 用作记录容器的全局唯一标识, 用作debug等场景
DockerContainerUUID string `json:"docker_container_uuid"`
// ResponseFromAgentFlag, 用于标识此返回值由agent发出, 防止错误的消息被agent处理
ResponseFromAgentFlag bool `json:"response_from_agent_flag"`
}
type ResponseCode string
const (
CodeOK = ""
CodeErr = "Err"
)
func (c *ContainerCreateParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return c.valid()
}
func (c *ContainerCreateParam) valid() error {
if len(c.Image) == 0 {
return errors.New("image is nil")
}
return nil
}
func (c *ContainerRuntimeParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return c.valid()
}
func (c *ContainerRuntimeParam) valid() error {
return nil
}
func (c *ContainerCreateAndRuntimeParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return c.valid()
}
func (c *ContainerCreateAndRuntimeParam) valid() error {
if err := c.Param.valid(); err != nil {
return err
}
if err := c.RuntimeParam.valid(); err != nil {
return err
}
return nil
}
func (c *ContainerOperateResponse) ParseFromString(in string) error {
return json.Unmarshal([]byte(in), c)
}
func (c *ContainerOperateResponse) String() (string, error) {
c.ErrMsg = CutErrMessage(c.ErrMsg)
out, err := json.Marshal(c)
return string(out), err
}
func CutErrMessage(errMsg string) string {
if len(errMsg) > 300 {
errMsg = errMsg[:300]
}
return errMsg
}
type MachineStatusParam struct {
MachineHealthInfo
}
type MachineRegionParam struct {
MachineID string `json:"machine_id"`
RegionSign string `json:"region_sign"`
}
type MachineRegisterParam struct {
MachineHardwareInfo
}
type ContainerStatusParam struct {
ContainerStatus
}
type ContainerUsageParam struct {
ContainerUsage
}
type ContainerMigrateParam struct {
ContainerPerformMigrate
}
// ContainerCheckRequest : server 主动发出的检查容器的请求. 可以扩展为主动检查状态, usage 等.
type ContainerCheckRequest struct {
Containers []ContainerID `json:"containers"`
}
// ContainerSkippedResponse 对 server 的主动检查命令的回应.
type ContainerSkippedResponse struct {
}
func (c *MachineStatusParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return err
}
func (c *MachineRegionParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return err
}
func (c *MachineRegisterParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return err
}
func (c *ContainerStatusParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return err
}
func (c *ContainerUsageParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return err
}
func (c *ContainerCheckRequest) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return err
}
func (c *ContainerCheckRequest) String() (string, error) {
out, err := json.Marshal(c)
return string(out), err
}
func (c *ContainerSkippedResponse) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return err
}
func (c *ContainerSkippedResponse) String() (string, error) {
out, err := json.Marshal(c)
return string(out), err
}
func NewMinioClient(cred MinioCredentials) (*minio.Client, error) {
endpoint := cred.Endpoint
httpsPrefix := "https://"
var secure bool
if strings.Contains(cred.Endpoint, httpsPrefix) {
secure = true
endpoint = strings.TrimPrefix(strings.TrimSpace(cred.Endpoint), httpsPrefix)
}
return minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(cred.AccessKeyID, cred.SecretAccessKey, ""),
Secure: secure,
})
}
type ContainerSaveParam struct {
ImageUUID string `json:"image_uuid"`
MinioBucketInfo MinioBucketInfo `json:"minio_bucket_info"`
NewContainerRuntimeParam NewContainerRuntimeParam
ProgressHook func(uploadOssInfo *UploadOssInfo) `json:"-"`
MinioCredentials MinioCredentials `json:"minio_credentials"`
}
type DownloadPrivateImageInfo struct {
ReadLayerImageName string `json:"read_layer_image_name"`
MinioBucketInfo MinioBucketInfo `json:"minio_bucket_info"`
MinioCredentials MinioCredentials `json:"minio_credentials"`
}
func (d *DownloadPrivateImageInfo) String() string {
s, _ := json.Marshal(d)
return string(s)
}
func (d *DownloadPrivateImageInfo) ParseFromContent(content string) error {
return json.Unmarshal([]byte(content), d)
}
type MinioBucketInfo struct {
BucketName string `json:"bucket_name"`
ObjectName string `json:"object_name"`
ObjectSize int64 `json:"object_size"`
}
type StorageOSSSignType string // oss的唯一标识
func (rs StorageOSSSignType) String() string {
return string(rs)
}
type MinioCredentials struct {
StorageOSSRegionSign StorageOSSSignType `json:"storage_oss_region_sign"`
DefaultBucketName string `json:"default_bucket_name"`
Endpoint string `json:"endpoint"`
AccessKeyID string `json:"access_key_id"`
SecretAccessKey string `json:"secret_access_key"`
}
type TmpDiffFileParam struct {
ContainerID ContainerID `json:"source_container_id"`
MergeTagName string `json:"merge_tag_name"`
ObjectName string `json:"object_name"`
}
func (c *ContainerSaveParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return c.valid()
}
func (c *ContainerSaveParam) valid() error {
if c.MinioBucketInfo.BucketName == "" || c.MinioBucketInfo.ObjectName == "" {
return errors.New("bucket info is nil")
}
return nil
}
func (c *ContainerMigrateParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return err
}
func (c *ContainerMigrateParam) String() (string, error) {
out, err := json.Marshal(c)
return string(out), err
}
type ContainerCancelSaveParam struct {
ImageUUID string `json:"image_uuid"`
NewContainerRuntimeParam NewContainerRuntimeParam
}
func (c *ContainerCancelSaveParam) String() (string, error) {
out, err := json.Marshal(c)
return string(out), err
}
func (c *ContainerCancelSaveParam) ParseFromString(in string) error {
// 检测socket是否是一个回声消息, 如果是, skip
if strings.Contains(in, "response_from_agent_flag") {
return EchoMessageSkippedError
}
err := json.Unmarshal([]byte(in), c)
if err != nil {
return err
}
return c.valid()
}
func (c *ContainerCancelSaveParam) valid() error {
if c.NewContainerRuntimeParam.ContainerID == "" || c.ImageUUID == "" {
return errors.New("container_id or image_uuid info is nil")
}
return nil
}
package agent_constant
import (
"fmt"
"net/url"
)
type URL struct {
host string
secure bool
}
// url: 192.168.1.126:33001
func ParseUrl(input string) (u *URL) {
u = &URL{}
uu, err := url.Parse(input)
if err != nil {
u.host = input
return
}
if len(uu.Host) == 0 {
u.host = input
return
}
u.host = uu.Host
u.secure = uu.Scheme == "https" || uu.Scheme == "wss"
return
}
func (u *URL) GetHost() (host string) {
return u.host
}
func (u *URL) GetSecure() bool {
return u.secure
}
/*
* GPU agent
*/
func (u *URL) GetWebsocketConnectUrls() (urls []string) {
if u.secure {
urls = append(urls, fmt.Sprintf("wss://%s/%s", u.host, WebsocketApiPrefix))
} else {
urls = append(urls, fmt.Sprintf("ws://%s/%s", u.host, WebsocketApiPrefix))
urls = append(urls, fmt.Sprintf("wss://%s/%s", u.host, WebsocketApiPrefix))
}
return
}
func (u *URL) GetSaveLogUrls() (urls []string) {
if u.secure {
urls = append(urls, fmt.Sprintf("https://%s/%s", u.host, SaveLogApiPrefix))
} else {
urls = append(urls, fmt.Sprintf("http://%s/%s", u.host, SaveLogApiPrefix))
urls = append(urls, fmt.Sprintf("https://%s/%s", u.host, SaveLogApiPrefix))
}
return
}
/*
* Storage agent
*/
func (u *URL) GetStorageAgentWebsocketConnectUrls() (urls []string) {
prefix := storageAgentWebsocketApiRoute
if u.secure {
urls = append(urls, fmt.Sprintf("wss://%s/%s", u.host, prefix))
} else {
urls = append(urls, fmt.Sprintf("ws://%s/%s", u.host, prefix))
urls = append(urls, fmt.Sprintf("wss://%s/%s", u.host, prefix))
}
return
}
func (u *URL) GetStorageAgentSaveLogUrls() (urls []string) {
prefix := storageAgentSaveLogApiRoute
if u.secure {
urls = append(urls, fmt.Sprintf("https://%s/%s", u.host, prefix))
} else {
urls = append(urls, fmt.Sprintf("http://%s/%s", u.host, prefix))
urls = append(urls, fmt.Sprintf("https://%s/%s", u.host, prefix))
}
return
}
const (
WebsocketApiPrefix string = "agent/v1/connect"
SaveLogApiPrefix string = "agent/v1/logs"
storageAgentWebsocketApiRoute string = "agent/v1/storage/connect"
storageAgentSaveLogApiRoute string = "agent/v1/storage/logs"
StorageAgentAuthApiRoute string = "agent/v1/storage/auth"
)
// ServerConnectedURL 成功连接的URL
var ServerConnectedURL *URL
func GetGlobalKVServerURL() string {
prefix := "api/v1/internal/kv/key/"
if ServerConnectedURL == nil {
return "https://test.autodl.com:33443" + prefix
}
if ServerConnectedURL.GetSecure() {
return fmt.Sprintf("https://%s/%s", ServerConnectedURL.GetHost(), prefix)
}
return fmt.Sprintf("http://%s/%s", ServerConnectedURL.GetHost(), prefix)
}
func GetGlobalFTServerURL() string {
prefix := "api/v1/internal/ft/status/"
if ServerConnectedURL == nil {
return "https://test.autodl.com:33443" + prefix
}
if ServerConnectedURL.GetSecure() {
return fmt.Sprintf("https://%s/%s", ServerConnectedURL.GetHost(), prefix)
}
return fmt.Sprintf("http://%s/%s", ServerConnectedURL.GetHost(), prefix)
}
package agent_constant_test
import (
"context"
"github.com/minio/minio-go/v7"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg/libs"
"testing"
)
var testCases = map[string]struct {
Len int
WsUrls []string
SaveLogUrls []string
}{
"example.gpuhub.com": {
Len: 2,
WsUrls: []string{"wss://example.gpuhub.com/" + WebsocketApiPrefix, "ws://example.gpuhub.com/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://example.gpuhub.com/" + SaveLogApiPrefix, "http://example.gpuhub.com/" + SaveLogApiPrefix},
},
"http://example.gpuhub.com": {
Len: 2,
WsUrls: []string{"wss://example.gpuhub.com/" + WebsocketApiPrefix, "ws://example.gpuhub.com/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://example.gpuhub.com/" + SaveLogApiPrefix, "http://example.gpuhub.com/" + SaveLogApiPrefix},
},
"https://example.gpuhub.com": {
Len: 1,
WsUrls: []string{"wss://example.gpuhub.com/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://example.gpuhub.com/" + SaveLogApiPrefix},
},
"ws://example.gpuhub.com": {
Len: 2,
WsUrls: []string{"wss://example.gpuhub.com/" + WebsocketApiPrefix, "ws://example.gpuhub.com/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://example.gpuhub.com/" + SaveLogApiPrefix, "http://example.gpuhub.com/" + SaveLogApiPrefix},
},
"wss://example.gpuhub.com": {
Len: 1,
WsUrls: []string{"wss://example.gpuhub.com/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://example.gpuhub.com/" + SaveLogApiPrefix},
},
"example.gpuhub.com:5231": {
Len: 2,
WsUrls: []string{"wss://example.gpuhub.com:5231/" + WebsocketApiPrefix, "ws://example.gpuhub.com:5231/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://example.gpuhub.com:5231/" + SaveLogApiPrefix, "http://example.gpuhub.com:5231/" + SaveLogApiPrefix},
},
"http://example.gpuhub.com:5231": {
Len: 2,
WsUrls: []string{"wss://example.gpuhub.com:5231/" + WebsocketApiPrefix, "ws://example.gpuhub.com:5231/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://example.gpuhub.com:5231/" + SaveLogApiPrefix, "http://example.gpuhub.com:5231/" + SaveLogApiPrefix},
},
"https://example.gpuhub.com:5231": {
Len: 1,
WsUrls: []string{"wss://example.gpuhub.com:5231/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://example.gpuhub.com:5231/" + SaveLogApiPrefix},
},
"ws://example.gpuhub.com:5231": {
Len: 2,
WsUrls: []string{"wss://example.gpuhub.com:5231/" + WebsocketApiPrefix, "ws://example.gpuhub.com:5231/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://example.gpuhub.com:5231/" + SaveLogApiPrefix, "http://example.gpuhub.com:5231/" + SaveLogApiPrefix},
},
"wss://example.gpuhub.com:5231": {
Len: 1,
WsUrls: []string{"wss://example.gpuhub.com:5231/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://example.gpuhub.com:5231/" + SaveLogApiPrefix},
},
"192.168.1.126:43001": {
Len: 2,
WsUrls: []string{"wss://192.168.1.126:43001/" + WebsocketApiPrefix, "ws://192.168.1.126:43001/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://192.168.1.126:43001/" + SaveLogApiPrefix, "http://192.168.1.126:43001/" + SaveLogApiPrefix},
},
"http://192.168.1.126:43001": {
Len: 2,
WsUrls: []string{"wss://192.168.1.126:43001/" + WebsocketApiPrefix, "ws://192.168.1.126:43001/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://192.168.1.126:43001/" + SaveLogApiPrefix, "http://192.168.1.126:43001/" + SaveLogApiPrefix},
},
"https://192.168.1.126:43001": {
Len: 1,
WsUrls: []string{"wss://192.168.1.126:43001/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://192.168.1.126:43001/" + SaveLogApiPrefix},
},
"ws://192.168.1.126:43001": {
Len: 2,
WsUrls: []string{"wss://192.168.1.126:43001/" + WebsocketApiPrefix, "ws://192.168.1.126:43001/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://192.168.1.126:43001/" + SaveLogApiPrefix, "http://192.168.1.126:43001/" + SaveLogApiPrefix},
},
"wss://192.168.1.126:43001": {
Len: 1,
WsUrls: []string{"wss://192.168.1.126:43001/" + WebsocketApiPrefix},
SaveLogUrls: []string{"https://192.168.1.126:43001/" + SaveLogApiPrefix},
},
}
var _ = Describe("Url", func() {
It("should parse ", func() {
for u, v := range testCases {
u1p := ParseUrl(u)
Expect(len(u1p.GetWebsocketConnectUrls())).Should(Equal(v.Len))
Expect(u1p.GetWebsocketConnectUrls()).
Should(ContainElements(v.WsUrls))
Expect(u1p.GetSaveLogUrls()).
Should(ContainElements(v.SaveLogUrls))
}
})
})
func TestNewMinioClient(t *testing.T) {
bucket := "user-private-image"
cli, err := NewMinioClient(MinioCredentials{
Endpoint: "oss-cn-neimeng.autodl.com:80",
AccessKeyID: "TestEnvAutodlMinioAccessUser",
SecretAccessKey: "test22e2a-d496-4b04-85e0-59e9ad2a99fa",
})
if err != nil {
t.Fatalf("new failed %s", err.Error())
}
ch := cli.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{})
for v := range ch {
if v.Err != nil {
t.Fatalf("get obj failed %s", v.Err.Error())
}
t.Logf("obj: %s", libs.IndentString(v))
}
}
package agent_container_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestAgentContainer(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "AgentContainer Suite")
}
package agent_container
import (
"kvm_server/pkg-agent/agent_constant"
"time"
)
func getNanoTimeFromString(in string) (t time.Time) {
t, _ = time.Parse(time.RFC3339Nano, in)
return
}
func getBootVolumeName(containerID agent_constant.ContainerID) string {
return containerID.String() + "-boot"
}
func GetStorageVolumeName(containerID agent_constant.ContainerID) string {
return containerID.String() + "-storage"
}
package agent_container_test
import (
"context"
"github.com/docker/docker/client"
. "github.com/onsi/gomega"
constant "kvm_server/pkg-agent/agent_constant"
"testing"
"time"
)
func NewDockerClient() (dockerClient constant.DockerClient, err error) {
dockerSocketPath := "tcp://192.168.1.61:5000"
dockerClient, err = client.NewClientWithOpts(
client.WithHost(dockerSocketPath),
//client.WithTimeout(time.Second*5),
client.WithVersion("1.41"),
)
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err = dockerClient.Ping(ctx)
return
}
func TestLocalPerformance(t *testing.T) {
RegisterTestingT(t)
// 用于本地测试的套件
//dockerClient, err := NewDockerClient()
//Ω(err).Should(Succeed())
//Ω(dockerClient).ShouldNot(Equal(nil))
//
//newContainer := agent_container.NewContainer(&constant.NewContainerParam{
// ContainerID: "ning",
// Image: "ubuntu:16.04",
// PreCmd: []string{
// "sleep 6000",
// },
// OnlyPreCmd: true,
// WorkingDir: "/",
// MaxWritableSizeInByte: 1024 * 1024 * 1024,
// MaxLocalDiskSizeInByte: 1024 * 1024 * 1024,
// ShmSize: 10 * 1024 * 1024 * 1024,
// Ports: []string{
// "8222:9999",
// },
//}, dockerClient)
//_, err = newContainer.Create(context.Background(), nil, nil)
//Ω(err).Should(Succeed())
//
//err = newContainer.Start(context.Background())
//Ω(err).Should(Succeed())
}
package agent_container
import (
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/go-units"
"strconv"
)
type containerStatCalculator struct {
stats *types.Stats
}
func newContainerStatCalculator(stats *types.Stats) *containerStatCalculator {
return &containerStatCalculator{stats}
}
func (c *containerStatCalculator) getCpuUsagePercent() (percent float64) {
percent = calculateCPUPercentUnix(c.stats.PreCPUStats.CPUUsage.TotalUsage, c.stats.PreCPUStats.SystemUsage, c.stats)
percent, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", percent), 64)
return
}
func (c *containerStatCalculator) getMemUsage() (usage int64) {
return int64(calculateMemUsageUnixNoCache(c.stats.MemoryStats))
}
func (c *containerStatCalculator) getMemLimit() (usage int64) {
return int64(c.stats.MemoryStats.Limit)
}
func (c *containerStatCalculator) getMemUsagePercent() (percent float64) {
percent = calculateMemPercentUnixNoCache(float64(c.stats.MemoryStats.Limit), calculateMemUsageUnixNoCache(c.stats.MemoryStats))
percent, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", percent), 64)
return
}
func (c *containerStatCalculator) prettyCpuPercent() string {
return fmt.Sprintf("%.2f%%", c.getCpuUsagePercent())
}
func (c *containerStatCalculator) prettyMemUsage() string {
return fmt.Sprintf("%s / %s", units.BytesSize(float64(c.getMemUsage())), units.BytesSize(float64(c.getMemLimit())))
}
func (c *containerStatCalculator) prettyMemPercent() string {
return fmt.Sprintf("%.2f%%", c.getMemUsagePercent())
}
// Do Not Edit these functions
// all functions are copy from: https://github.com/docker/cli/blob/master/cli/command/container/stats_helpers.go:166
func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *types.Stats) float64 {
var (
cpuPercent = 0.0
// calculate the change for the cpu usage of the container in between readings
cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
// calculate the change for the entire system between readings
systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
onlineCPUs = float64(v.CPUStats.OnlineCPUs)
)
if onlineCPUs == 0.0 {
onlineCPUs = float64(len(v.CPUStats.CPUUsage.PercpuUsage))
}
if systemDelta > 0.0 && cpuDelta > 0.0 {
cpuPercent = (cpuDelta / systemDelta) * onlineCPUs * 100.0
}
return cpuPercent
}
func calculateBlockIO(blkio types.BlkioStats) (uint64, uint64) {
var blkRead, blkWrite uint64
for _, bioEntry := range blkio.IoServiceBytesRecursive {
if len(bioEntry.Op) == 0 {
continue
}
switch bioEntry.Op[0] {
case 'r', 'R':
blkRead = blkRead + bioEntry.Value
case 'w', 'W':
blkWrite = blkWrite + bioEntry.Value
}
}
return blkRead, blkWrite
}
func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) {
var rx, tx float64
for _, v := range network {
rx += float64(v.RxBytes)
tx += float64(v.TxBytes)
}
return rx, tx
}
// calculateMemUsageUnixNoCache calculate memory usage of the container.
// Cache is intentionally excluded to avoid misinterpretation of the output.
//
// On cgroup v1 host, the result is `mem.Usage - mem.Stats["total_inactive_file"]` .
// On cgroup v2 host, the result is `mem.Usage - mem.Stats["inactive_file"] `.
//
// This definition is consistent with cadvisor and containerd/CRI.
// * https://github.com/google/cadvisor/commit/307d1b1cb320fef66fab02db749f07a459245451
// * https://github.com/containerd/cri/commit/6b8846cdf8b8c98c1d965313d66bc8489166059a
//
// On Docker 19.03 and older, the result was `mem.Usage - mem.Stats["cache"]`.
// See https://github.com/moby/moby/issues/40727 for the background.
func calculateMemUsageUnixNoCache(mem types.MemoryStats) float64 {
// cgroup v1
if v, isCgroup1 := mem.Stats["total_inactive_file"]; isCgroup1 && v < mem.Usage {
return float64(mem.Usage - v)
}
// cgroup v2
if v := mem.Stats["inactive_file"]; v < mem.Usage {
return float64(mem.Usage - v)
}
return float64(mem.Usage)
}
func calculateMemPercentUnixNoCache(limit float64, usedNoCache float64) float64 {
// MemoryStats.Limit will never be 0 unless the container is not running and we haven't
// got any data from cgroup
if limit != 0 {
return usedNoCache / limit * 100.0
}
return 0
}
package agent_container
import (
"context"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
constant "kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg/logger"
)
func ImageList(ctx context.Context, l *logger.Logger, dockerClient constant.DockerClient, imageName string) (imageSummary []types.ImageSummary, err error) {
filter := filters.NewArgs()
/**
reference” : that can be used in order to isolate images having a certain name or tag;
“before” : to filter images created “before” a specific point in time;
“since” : to filter images since a specific point in time (usually another image creation);
“label” : if you used the LABEL instruction to create metadata for your image you can filter them later with this key
“dangling” : in order to isolate images that are not used anymore.
*/
filter.Add("reference", imageName)
opts := types.ImageListOptions{
Filters: filter,
}
imageSummary, err = dockerClient.ImageList(ctx, opts)
if err != nil {
l.ErrorE(err, " remove image %s failed", imageName)
return
}
l.Info("docker get image list success.")
return
}
package agent_container
import (
"context"
"encoding/json"
"github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/shopspring/decimal"
"io"
constant "kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg/logger"
"strings"
"time"
)
// PullImage 拉取镜像, 并且持续执行进度hook
func PullImage(ctx context.Context, l *logger.Logger, dockerClient constant.DockerClient, imageName string, progressHook func(pullProgress, downloadProgress float64)) (err error) {
var responseBody io.ReadCloser
responseBody, err = dockerClient.ImagePull(ctx, imageName, types.ImagePullOptions{
All: false,
})
if err != nil {
l.ErrorE(err, "begin pull image %s", imageName)
return
}
defer func() {
if progressHook != nil {
progressHook(1, 0)
}
_ = responseBody.Close()
}()
var (
dec = json.NewDecoder(responseBody)
ids = make(map[string]int64)
)
// 上次执行progress hook的时间, 每3s执行一次
// dockerd需要一定的时间返回给client所有layer的信息, 初步定为3s, 即3s后认为已经收集到了所有layer的id
var begin time.Time
// dockerd返回的第一条记录是镜像的名称, 不能算做 layer id
firstLoop := true
var lastProgress float64
for {
var jm jsonmessage.JSONMessage
if err = dec.Decode(&jm); err != nil {
if err == io.EOF {
err = nil
break
}
return err
}
if firstLoop {
begin = time.Now()
firstLoop = false
l.Info("begin pull first layer of image %s", imageName)
continue
}
// pull image完成前的最后几个log的id为空, 不能算做 layer id, 忽略之
if len(jm.ID) < 10 {
continue
}
// 为id初始化
if _, ok := ids[jm.ID]; !ok {
ids[jm.ID] = 0
}
// 记录精确计算的进度, 一些case被忽略了, 比如layer check sum, 所以只记录>0的进度
if p := calculateLayerProgress(jm); p > ids[jm.ID] {
ids[jm.ID] = p
}
if time.Now().After(begin.Add(time.Second * 3)) {
begin = time.Now()
var progressSum int64
for _, p := range ids {
progressSum += p
}
pro, _ := decimal.NewFromFloat(float64(progressSum) / float64(len(ids)*100)).Round(4).Float64()
if lastProgress != pro {
lastProgress = pro
if progressHook != nil {
progressHook(pro, 0)
}
l.Info("pull progress of image %s is %.4f", imageName, pro)
}
}
}
return
}
// calculateLayerProgress return [0, 100]
func calculateLayerProgress(jm jsonmessage.JSONMessage) (progress int64) {
if strings.Contains(jm.Status, "Downloading") {
// 下载中, [0, 50]
if jm.Progress != nil {
progress = int64(float64(jm.Progress.Current) / float64(jm.Progress.Total) * 50)
return
}
} else if strings.Contains(jm.Status, "Extracting") {
// 解压中, [50, 99]
if jm.Progress != nil {
progress = 50 + int64(float64(jm.Progress.Current)/float64(jm.Progress.Total)*48)
return
}
} else if strings.Contains(jm.Status, "complete") {
// 完成, [100]
return 99
}
return
}
package agent_container_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
constant "kvm_server/pkg-agent/agent_constant"
//. "kvm_server/pkg-agent/agent_container"
//"kvm_server/pkg-agent/messenger"
//"kvm_server/pkg/logger"
//"strconv"
//"strings"
//"time"
//"kvm_server/pkg/logger"
)
var _ = Describe("Container", func() {
It("拉取镜像", func() {
dockerClient, err := constant.NewDockerClient()
Ω(err).Should(Succeed())
Ω(dockerClient).ShouldNot(Equal(nil))
//err = PullImage(context.Background(), logger.NewLogger("test"), dockerClient, "mysql", func(progress float64) {
// fmt.Println("------------------>", progress*100, "%")
//})
//Ω(err).Should(Succeed())
})
})
package agent_container
import (
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/pkg/errors"
constant "kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg-agent/hardware"
"kvm_server/pkg-agent/hardware/resource/nvidia"
"kvm_server/pkg/libs"
"kvm_server/pkg/logger"
)
func prepareGpus(l *logger.Logger, gpuUUIDList []string, capabilities []string) (device []container.DeviceRequest, mounts []container.DeviceMapping, cpuSet string, err error) {
var gpuType constant.GPUType
gpuType, err = hardware.DetectGpu()
if err != nil {
err = errors.Wrap(err, "detect gpu on machine failed")
return
}
var gpuInfos *hardware.GpuInfo
gpuInfos, err = hardware.GetGpuInfo()
if err != nil {
err = errors.Wrap(err, "get gpu info during start on machine failed")
return
}
l.Info("prepareGpus get gpuType is [%s]", gpuType)
switch gpuType {
case constant.NvidiaGPUKey:
device = prepareNvidiaDevice(gpuUUIDList, capabilities)
case constant.CambriconGPUKey:
var indexList []uint
for _, v := range gpuInfos.GPUS {
for _, uuid := range gpuUUIDList {
if v.UUID == uuid {
indexList = append(indexList, v.Index)
}
}
}
if len(gpuUUIDList) != len(indexList) {
err = errors.Errorf("failed to trans cambricon gpu from uuid to index, lens not match, uuid: %s, indexList: %v, gpuinfo: %s", gpuUUIDList, indexList, libs.IndentString(gpuInfos))
return
}
mounts = prepareCambriconDevice(indexList)
device = excludeNvidiaDevice(capabilities)
case constant.CpuSetGPUKey:
for _, v := range gpuUUIDList {
set := constant.ParseCpuSetFromUUID(v)
if set != "" {
if cpuSet != "" {
cpuSet = cpuSet + ","
}
cpuSet = cpuSet + set
}
}
device = excludeNvidiaDevice(capabilities)
default:
}
return
}
func prepareNvidiaDevice(gpuUUIDList []string, capabilities []string) (device []container.DeviceRequest) {
if len(gpuUUIDList) > 0 {
device = []container.DeviceRequest{
{
Driver: constant.NvidiaDocker,
DeviceIDs: gpuUUIDList,
Capabilities: [][]string{
capabilities,
},
},
}
} else {
device = []container.DeviceRequest{
{
Driver: constant.NvidiaDocker,
DeviceIDs: []string{"none"},
Capabilities: [][]string{
capabilities,
},
},
}
}
return
}
func excludeNvidiaDevice(capabilities []string) (device []container.DeviceRequest) {
n := nvidia.GpuNvidiaDriveInfo{}
_, err := n.Initialize()
if err != nil {
// 机器上没有gpu
return
}
device = []container.DeviceRequest{
{
Driver: constant.NvidiaDocker,
DeviceIDs: []string{"none"},
Capabilities: [][]string{
capabilities,
},
},
}
return
}
func prepareCambriconDevice(gpuIndexList []uint) (deviceMapping []container.DeviceMapping) {
ctlPathOnHost := "/dev/cambricon_ctl"
// 无卡则指定path on host长度为0, dockerd会删除对应的挂载点
if len(gpuIndexList) == 0 {
ctlPathOnHost = ""
}
deviceMapping = append(deviceMapping, container.DeviceMapping{
PathOnHost: ctlPathOnHost,
PathInContainer: "/dev/cambricon_ctl",
CgroupPermissions: "rwm",
})
for _, index := range gpuIndexList {
gpuDevDevicePath := fmt.Sprintf("/dev/cambricon_dev%d", index)
deviceMapping = append(deviceMapping, container.DeviceMapping{
PathOnHost: gpuDevDevicePath,
PathInContainer: gpuDevDevicePath,
CgroupPermissions: "rwm",
})
}
return
}
package agent_container
import "github.com/pkg/errors"
var (
ParamsIsInvalid = errors.New("container params is invalid")
DockerClientIsInvalid = errors.New("docker client is invalid")
)
package agent_container
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/pkg/errors"
"io"
constant "kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg/logger"
"os"
"path/filepath"
"strings"
"time"
)
// ContainerRuntime 定义容器实例的运行时
type ContainerRuntime struct {
dockerClient constant.DockerClient
param *constant.NewContainerRuntimeParam
logger *logger.Logger
}
func NewContainerRuntime(param *constant.NewContainerRuntimeParam, dockerClient constant.DockerClient) *ContainerRuntime {
return &ContainerRuntime{
dockerClient,
param,
logger.NewLogger(fmt.Sprintf("Container [%s]", param.ContainerID)),
}
}
func (c *ContainerRuntime) validate() error {
return validateContainerRuntimeParams(c.param, c.dockerClient)
}
func (c *ContainerRuntime) Stop(ctx context.Context) (err error) {
err = c.validate()
if err != nil {
return err
}
var timeout time.Duration
if c.param.StopTimeout != nil {
timeout = time.Duration(*c.param.StopTimeout) * time.Second
}
if err = c.ExecCmd(ctx); err != nil {
c.logger.WarnE(err, "Failed to exec cmd, ContainerID : %+v, cmd : %+v", c.param.ContainerID, c.param.Cmd)
}
err = c.dockerClient.ContainerStop(ctx, string(c.param.ContainerID), &timeout)
// 找不到正好, 视为已经完成了
if client.IsErrNotFound(err) {
return nil
}
c.StopADFSContainer(ctx)
return
}
func (c *ContainerRuntime) ExecCmd(ctx context.Context) (err error) {
if c.param.Cmd == "" {
return nil
}
// 在容器中执行命令
execConfig := types.ExecConfig{
Cmd: []string{"bash", "-c", c.param.Cmd},
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
}
// 创建执行实例
exec, err := c.dockerClient.ContainerExecCreate(ctx, string(c.param.ContainerID), execConfig)
if err != nil {
c.logger.WarnE(err, "Failed to create exec instance, ContainerID : %+v", c.param.ContainerID)
return err
}
// 开始执行
if err = c.dockerClient.ContainerExecStart(ctx, exec.ID, types.ExecStartCheck{}); err != nil {
c.logger.WarnE(err, "Failed to start exec instance, ContainerID : %+v, execId : %+v", c.param.ContainerID, exec.ID)
return err
}
return
}
func (c *ContainerRuntime) Remove(ctx context.Context) (err error) {
err = c.validate()
if err != nil {
return err
}
err = c.dockerClient.ContainerRemove(ctx, string(c.param.ContainerID), types.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
})
// 找不到正好, 视为已经完成了
if client.IsErrNotFound(err) {
return nil
}
if err != nil {
return
}
var removeVolumeErr error
removeVolumeErr = c.dockerClient.VolumeRemove(ctx, getBootVolumeName(c.param.ContainerID), true)
if removeVolumeErr != nil && !client.IsErrNotFound(removeVolumeErr) {
c.logger.WarnE(removeVolumeErr, "remove volume of %s failed", c.param.ContainerID)
}
removeVolumeErr = c.dockerClient.VolumeRemove(ctx, GetStorageVolumeName(c.param.ContainerID), true)
if removeVolumeErr != nil && !client.IsErrNotFound(removeVolumeErr) {
c.logger.WarnE(removeVolumeErr, "remove volume of %s failed", c.param.ContainerID)
}
c.StopADFSContainer(ctx)
c.RemoveADFSContainer(ctx)
return
}
func (c *ContainerRuntime) RemoveButRetainVolumes(ctx context.Context) (err error) {
err = c.validate()
if err != nil {
return err
}
err = c.dockerClient.ContainerRemove(ctx, string(c.param.ContainerID), types.ContainerRemoveOptions{
RemoveVolumes: false,
Force: true,
})
// 找不到正好, 视为已经完成了
if client.IsErrNotFound(err) {
return nil
}
if err != nil {
return
}
// 此处不删除卷, 给 create 复用.
return
}
func (c *ContainerRuntime) Export(ctx context.Context) (err error) {
err = c.validate()
if err != nil {
return err
}
var exportFile io.WriteCloser
exportFile, err = os.Create(filepath.Join(os.TempDir(), "export_docker_image.tar"))
if err != nil {
return errors.Wrap(err, "create export file failed")
}
defer func() {
_ = exportFile.Close()
}()
var srcBody io.ReadCloser
srcBody, err = c.dockerClient.ContainerExport(ctx, string(c.param.ContainerID))
if err != nil {
return errors.Wrap(err, "container export failed")
}
defer func() {
_ = srcBody.Close()
}()
_, err = io.Copy(exportFile, srcBody)
if err != nil {
return errors.Wrap(err, "io copy failed")
}
// TODO: move image to registry
return
}
// stopADFSContainer 不需要remove, 因为adfs容器都是停止后autoremove
func (c *ContainerRuntime) StopADFSContainer(ctx context.Context) {
timeOut := time.Second * 3
info, err := c.dockerClient.ContainerInspect(ctx, c.param.ContainerID.AdfsContainerName())
if err == nil {
for _, v := range info.Mounts {
if v.Destination == "/cache" {
c.logger.Info("clean adfs cache directory")
if strings.HasPrefix(v.Source, constant.ADFSOnMachineCachePath) {
os.RemoveAll(v.Source)
}
}
}
}
stopAdfsContainerErr := c.dockerClient.ContainerStop(ctx, c.param.ContainerID.AdfsContainerName(), &timeOut)
if stopAdfsContainerErr != nil && !client.IsErrNotFound(stopAdfsContainerErr) {
c.logger.ErrorE(stopAdfsContainerErr, "failed to stop adfs container")
}
c.logger.Info("stopped adfs container: %s", c.param.ContainerID.AdfsContainerName())
}
func (c *ContainerRuntime) RemoveADFSContainer(ctx context.Context) {
info, err := c.dockerClient.ContainerInspect(ctx, c.param.ContainerID.AdfsContainerName())
if err == nil {
for _, v := range info.Mounts {
if v.Destination == "/cache" {
c.logger.Info("clean adfs cache directory")
if strings.HasPrefix(v.Source, constant.ADFSOnMachineCachePath) {
os.RemoveAll(v.Source)
}
}
}
}
stopAdfsContainerErr := c.dockerClient.ContainerRemove(ctx, c.param.ContainerID.AdfsContainerName(), types.ContainerRemoveOptions{
Force: true,
})
if stopAdfsContainerErr != nil && !client.IsErrNotFound(stopAdfsContainerErr) {
c.logger.ErrorE(stopAdfsContainerErr, "failed to stop adfs container")
}
c.logger.Info("removed adfs container: %s", c.param.ContainerID.AdfsContainerName())
}
package agent_container
import (
constant "kvm_server/pkg-agent/agent_constant"
)
func validateContainerParams(param *constant.NewContainerParam, dockerClient constant.DockerClient) error {
if param == nil || len(param.ContainerID) == 0 {
return ParamsIsInvalid
}
if dockerClient == nil {
return DockerClientIsInvalid
}
return nil
}
func validateContainerRuntimeParams(param *constant.NewContainerRuntimeParam, dockerClient constant.DockerClient) error {
if param == nil || len(param.ContainerID) == 0 {
return ParamsIsInvalid
}
if dockerClient == nil {
return DockerClientIsInvalid
}
return nil
}
package agent_container
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/mount"
volumetypes "github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/docker/volume/mounts"
constant "kvm_server/pkg-agent/agent_constant"
"kvm_server/pkg/libs"
"kvm_server/pkg/serializer"
"kvm_server/pkg/xfs_quota"
"os"
"strings"
)
// TODO: Byte
func (c *Container) getVolume(name string, sizeInByte int64) (volume types.Volume, err error) {
var exist bool
exist, volume, err = c.checkVolumeExist(name)
if err != nil {
return
}
if exist {
// 权宜之计: 临时新增的trick, 后续dockerd版本更新后会支持动态设置volumesize, 这里后面会删除
// 如果容器的local硬盘发生了size变化, 则需要在启动完成后调用xfs调整quota为真实设置的size
optsIsEqual := compareVolumeOpts(getVolumeOpts(sizeInByte), volume.Options)
c.logger.Info("volume opts compare %t, new: %s, old %s", optsIsEqual, libs.IndentString(getVolumeOpts(sizeInByte)), libs.IndentString(volume.Options))
if !optsIsEqual {
c.PostHooks = append(c.PostHooks, func() error {
return xfs_quota.UpdatePathQuotaSize(constant.QuotaMountPath, volume.Mountpoint, sizeInByte)
})
}
return
}
volume, err = c.dockerClient.VolumeCreate(context.Background(), volumetypes.VolumeCreateBody{
Driver: "local",
DriverOpts: getVolumeOpts(sizeInByte),
Labels: nil,
Name: name,
})
c.logger.Info("volume %s create result: %T", serializer.IndentString(volume), err)
return
}
func compareVolumeOpts(optsA, optsB map[string]string) (equal bool) {
if len(optsA) != len(optsB) {
return
}
for k := range optsA {
if optsA[k] != optsB[k] {
return
}
}
return true
}
func getVolumeOpts(sizeInByte int64) map[string]string {
sizeInMB := constant.ToSize(sizeInByte, constant.TypeMB, constant.DefaultStorage, constant.FloorType)
return map[string]string{
"size": fmt.Sprintf("%dM", sizeInMB),
}
}
func (c *Container) checkVolumeExist(name string) (exist bool, volume types.Volume, err error) {
volume, err = c.dockerClient.VolumeInspect(context.Background(), name)
if err == nil {
exist = true
err = nil
return
}
if client.IsErrNotFound(err) {
exist = false
err = nil
return
}
return
}
func (c *Container) getTmpfsVolume(name string, sizeInMB int64) (volume types.Volume, err error) {
var exist bool
exist, volume, err = c.checkVolumeExist(name)
if err != nil {
return
}
if exist {
return
}
volume, err = c.dockerClient.VolumeCreate(context.Background(), volumetypes.VolumeCreateBody{
Driver: "local",
DriverOpts: map[string]string{
"o": fmt.Sprintf("size=%dM", sizeInMB),
"type": "tmpfs",
"device": "tmpfs",
},
Labels: nil,
Name: name,
})
c.logger.Info("volume %s create result: %T", serializer.IndentString(volume), err)
return
}
/*
* isCreating: 在创建时, 如果网盘路径不存在直接跳过一行; 在启动时, 如果网盘路径不存在, 不跳过, 设置 source 为空.
*/
func (c *Container) getContainerMounts(mountList []string, isCreating bool) (mountOutput []mount.Mount) {
parser := mounts.NewParser(mounts.OSLinux)
for _, v := range mountList {
m, err := parser.ParseMountRaw(v, "local")
if err != nil {
c.logger.ErrorE(err, "Error format of mount content, watch out your content: %s", v)
continue
}
// 对网盘挂载特殊校验, '/storage'
if strings.Contains(m.Source, constant.NetDiskOnMachineMountDir) ||
strings.Contains(m.Source, constant.ADFSOnMachineMountDir) {
// if not exist, continue
if !libs.ExistPathWithCtx(context.Background(), m.Source) { // with timeout: 800ms
if isCreating {
continue
} else {
// 为了删掉挂载点,source设置为空,docker内部自定义的逻辑会清除该挂载点
m.Source = ""
}
}
}
thisMount := mount.Mount{
Type: mount.TypeBind,
Source: m.Source,
Target: m.Destination,
ReadOnly: !m.RW,
}
if m.Propagation == mount.PropagationShared {
thisMount.BindOptions = &mount.BindOptions{Propagation: m.Propagation}
}
mountOutput = append(mountOutput, thisMount)
}
return
}
func (c *Container) preStartGetContainerMounts(mountList []string) (mountOutput []mount.Mount) {
parser := mounts.NewParser(mounts.OSLinux)
for _, v := range mountList {
m, err := parser.ParseMountRaw(v, "local")
if err != nil {
c.logger.ErrorE(err, "Error format of mount content, watch out your content: %s", v)
continue
}
os.MkdirAll(m.Source, os.ModePerm)
thisMount := mount.Mount{
Type: mount.TypeBind,
Source: m.Source,
Target: m.Destination,
ReadOnly: !m.RW,
}
if m.Propagation == mount.PropagationShared {
thisMount.BindOptions = &mount.BindOptions{Propagation: m.Propagation}
}
mountOutput = append(mountOutput, thisMount)
}
return
}
func (c *Container) GetVolumeSize(containerID constant.ContainerID) (size, usedSize uint64, err error) {
exist, volume, err := c.checkVolumeExist(GetStorageVolumeName(containerID))
if err != nil {
return
}
if !exist {
c.logger.WithField("containerID", containerID).Warn("container volume is not exist")
return 0, 0, nil
}
size, usedSize, err = xfs_quota.GetPathQuotaSize(volume.Mountpoint)
if err != nil {
return
}
return
}
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!