job-pool.go 1003B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package main
  2. type Task struct {
  3. ID string
  4. Payload interface{}
  5. }
  6. type TaskCallback func(Task) error
  7. type JobPool struct {
  8. // pools 线程池大小
  9. pools int
  10. // tasks 任务列表
  11. tasks chan Task
  12. // callback 执行任务的具体函数
  13. callback TaskCallback
  14. // result 执行结果标志位
  15. result chan int
  16. }
  17. // NewJobPool 生成默认的 Job 池
  18. func NewJobPool() *JobPool {
  19. return &JobPool{
  20. pools: 10,
  21. tasks: make(chan Task, 100),
  22. result: make(chan int),
  23. }
  24. }
  25. // Push 添加任务
  26. func (t *JobPool) Push(task Task) {
  27. t.tasks <- task
  28. }
  29. // Callback 设置 Callback
  30. func (t *JobPool) Callback(f TaskCallback) {
  31. t.callback = f
  32. }
  33. // Start 启动 Job 服务
  34. func (t *JobPool) Start() {
  35. for i := 0; i < t.pools; i += 1 {
  36. go func() {
  37. for {
  38. task := <-t.tasks
  39. t.callback(task)
  40. t.result <- 1
  41. }
  42. }()
  43. }
  44. }
  45. func (t *JobPool) Done(total int) {
  46. cursor := 0
  47. if total < 1 {
  48. return
  49. }
  50. for {
  51. <-t.result
  52. cursor += 1
  53. if cursor >= total {
  54. break
  55. }
  56. }
  57. }