Skip to the content.

AGScheduler

test codecov Go Report Card Go Reference GitHub release (with filter) GitHub go.mod Go version (subdirectory of monorepo) license

Advanced Golang Scheduler (AGScheduler) 是一款适用于 Golang 的任务调度库,支持多种调度类型,支持动态更改和持久化作业,支持作业队列,支持作业结果回收,支持事件监听,支持远程调用,支持集群

English 简体中文

特性

架构

Framework

安装

go get -u github.com/agscheduler/agscheduler

使用

package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/agscheduler/agscheduler"
	"github.com/agscheduler/agscheduler/stores"
)

func printMsg(ctx context.Context, j agscheduler.Job) (result string) {
	slog.Info(fmt.Sprintf("Run job `%s` %s\n\n", j.FullName(), j.Args))
	return
}

func main() {
	agscheduler.RegisterFuncs(
		agscheduler.FuncPkg{Func: printMsg},
	)

	scheduler := &agscheduler.Scheduler{}

	store := &stores.MemoryStore{}
	scheduler.SetStore(store)

	job1 := agscheduler.Job{
		Name:     "Job1",
		Type:     agscheduler.JOB_TYPE_INTERVAL,
		Interval: "2s",
		Timezone: "UTC",
		Func:     printMsg,
		Args:     map[string]any{"arg1": "1", "arg2": "2", "arg3": "3"},
	}
	job1, _ = scheduler.AddJob(job1)
	slog.Info(fmt.Sprintf("%s.\n\n", job1))

	job2 := agscheduler.Job{
		Name:     "Job2",
		Type:     agscheduler.JOB_TYPE_CRON,
		CronExpr: "*/1 * * * *",
		Timezone: "Asia/Shanghai",
		FuncName: "main.printMsg",
		Args:     map[string]any{"arg4": "4", "arg5": "5", "arg6": "6", "arg7": "7"},
	}
	job2, _ = s.AddJob(job2)
	slog.Info(fmt.Sprintf("%s.\n\n", job2))

	job3 := agscheduler.Job{
		Name:     "Job3",
		Type:     agscheduler.JOB_TYPE_DATETIME,
		StartAt:  "2023-09-22 07:30:08",
		Timezone: "America/New_York",
		Func:     printMsg,
		Args:     map[string]any{"arg8": "8", "arg9": "9"},
	}
	job3, _ = s.AddJob(job3)
	slog.Info(fmt.Sprintf("%s.\n\n", job3))

	jobs, _ := s.GetAllJobs()
	slog.Info(fmt.Sprintf("Scheduler get all jobs %s.\n\n", jobs))

	scheduler.Start()

	select {}
}

注册函数

由于 golang 无法序列化函数,所以 scheduler.Start() 之前需要使用 RegisterFuncs 注册函数

队列

mq := &queues.MemoryQueue{}
broker := &agscheduler.Broker{
	Queues: map[string]agscheduler.QueuePkg{
		"default": {
			Queue:   mq,
			Workers: 2,
		},
	},
}

scheduler.SetBroker(broker)

结果回收

mb := &backends.MemoryBackend{}
recorder := &agscheduler.Recorder{Backend: mb}

scheduler.SetRecorder(recorder)

job, _ = scheduler.AddJob(job)
records, _ := recorder.GetRecords(job.Id)

事件监听

func jobCallback(ep agscheduler.EventPkg) {
	slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId))
}

......

listener := &agscheduler.Listener{
	Callbacks: []agscheduler.CallbackPkg{
		{
			Callback: jobCallback,
			Event:    agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED,
		},
	},
}

scheduler.SetListener(listener)

gRPC

// Server
grservice := services.GRPCService{
	Scheduler: scheduler,
	Address:   "127.0.0.1:36360",
	// PasswordSha2: "xxxxxx",
}
grservice.Start()

// Client
conn, _ := grpc.NewClient("127.0.0.1:36360", grpc.WithTransportCredentials(insecure.NewCredentials()))
client := pb.NewSchedulerClient(conn)
client.AddJob(ctx, job)

HTTP

// Server
hservice := services.HTTPService{
	Scheduler: scheduler,
	Address:   "127.0.0.1:36370",
	// PasswordSha2: "xxxxxx",
}
hservice.Start()

// Client
mJob := map[string]any{...}
bJob, _ := json.Marshal(mJob)
resp, _ := http.Post("http://127.0.0.1:36370/scheduler/job", "application/json", bytes.NewReader(bJob))

集群

// Main Node
cnMain := &agscheduler.ClusterNode{
	Endpoint:     "127.0.0.1:36380",
	EndpointGRPC: "127.0.0.1:36360",
	EndpointHTTP: "127.0.0.1:36370",
	Queue:        "default",
}
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()

// Worker Node
cnNode := &agscheduler.ClusterNode{
	EndpointMain: "127.0.0.1:36380",
	Endpoint:     "127.0.0.1:36381",
	EndpointGRPC: "127.0.0.1:36361",
	EndpointHTTP: "127.0.0.1:36371",
	Queue:        "worker",
}
schedulerNode.SetClusterNode(ctx, cnNode)
cserviceNode := &services.ClusterService{Cn: cnNode}
cserviceNode.Start()

集群 HA (高可用,实验性)

// HA 需要满足以下条件:
//
// 1. 集群中 HA 节点的数量必须为奇数
// 2. 所有 HA 节点都需要连接到同一个存储(不包含 MemoryStore)
// 3. ClusterNode 的 Mode 属性需要设置为 `HA`
// 4. HA 主节点必须先启动

// Main HA Node
cnMain := &agscheduler.ClusterNode{..., Mode: "HA"}

// HA Node
cnNode1 := &agscheduler.ClusterNode{..., Mode: "HA"}
cnNode2 := &agscheduler.ClusterNode{..., Mode: "HA"}

// Worker Node
cnNode3 := &agscheduler.ClusterNode{...}

Base API

gRPC Function HTTP Method HTTP Path
GetInfo GET /info
GetFuncs GET /funcs

Scheduler API

gRPC Function HTTP Method HTTP Path
AddJob POST /scheduler/job
GetJob GET /scheduler/job/:id
GetAllJobs GET /scheduler/jobs
UpdateJob PUT /scheduler/job
DeleteJob DELETE /scheduler/job/:id
DeleteAllJobs DELETE /scheduler/jobs
PauseJob POST /scheduler/job/:id/pause
ResumeJob POST /scheduler/job/:id/resume
RunJob POST /scheduler/job/run
ScheduleJob POST /scheduler/job/schedule
Start POST /scheduler/start
Stop POST /scheduler/stop

Broker API

gRPC Function HTTP Method HTTP Path
GetQueues GET /broker/queues

Recorder API

gRPC Function HTTP Method HTTP Path
GetRecords GET /recorder/records/:job_id
GetAllRecords GET /recorder/records
DeleteRecords DELETE /recorder/records/:job_id
DeleteAllRecords DELETE /recorder/records

Cluster API

gRPC Function HTTP Method HTTP Path
GetNodes GET /cluster/nodes

示例

完整示例

开发

# 克隆代码
git clone git@github.com:agscheduler/agscheduler.git

# 工作目录
cd agscheduler

# 安装依赖
make install

# 启动 CI 服务
make up-ci-services

# 运行检查
make check-all

Cli

cargo install agscheduler-cli

Web

docker run --rm -p 8080:80 ghcr.io/agscheduler/agscheduler-web:latest

致谢

APScheduler

simple-raft