Ruby 用 Ruby 讲从创业到 996 公司的故事 (戏说 master-worker 模式)

Mark24 · 2022年07月23日 · 最后由 payalbatra 回复于 2024年04月22日 · 909 次阅读

前言

阅读大概需要 20 分钟。

假设你希望了解 线程、线程池、集群模式/Master-Worker 模式、调度器。

需要了解 Ruby 基本的用法和面向对象思想。

本文戏说,无须严肃对待。勿对号入座。个人也没有严肃观点。个人观点和所有人没有关系。

本文博客地址

完整代码示例

github:rb-master-worker-demo

Master Worker 模式

MasterWorker 模式,也有翻译成作集群模式、也叫 Master-Slave 模式。

Git 不许使用 master 了,换成了 main,Master/Slave 具有政治不正确的歧视色彩。不过这不重要了。其实这个名字很能表达这个模式的特点。

主要思想就是由一个 Master 抽象对象来调度 Worker 对象来工作。

Ruby 文学编程,用代码讲故事

其实这也非常像现实中的工作模型。Ruby 天生面向对象,表达的文学性,我们可以很方便的来使用代码模拟这种现实情况。 我们来用 Ruby 模拟下现实中这种情况,顺便学下如何实现这个模式。

约定

会出现几个类:

  • Master 代表“领导”,不干活,主要工作任务是分配任务,这是 Master 类的特征。
  • Worker 代表“打工人”,工作和创造价值的主体。主要任务就是干活。
  • Workshop 代表“公司”,主要是负责接单。

故事的思路:

我们自己是客户,把“任务”订单交给“公司”,这些任务会转交给“领导”手中,然后“领导”会排期,把工作布置给“打工人”。最终“打工人”乐此不疲的完成任务。

实现 打工人 Worker 类

step1 给员工工号

首先我们建立一个 Worker 类,我们给他一个名字属性。attr 暴露出 name 属性。

# Workshop.rb

class Worker
  attr :name
  def initialize(name)
    @name = "worker@#{name}"
  end
end

我们采用 TDD 方式来逐步实现我们的想法:

#Workshop_test.rb
require 'minitest/autorun'
require_relative '../lib/Workshop'

describe Worker do
  it "check worker name" do
    w = Worker.new("ruby01")
    assert_equal w.name, "worker@ruby01"
  end
end

很快,我们知道这名打工人他叫“ruby01”员工。

step2 给员工 KPI/OKR

我们不希望打工人每次只能做一件事,你必须得推着他才能工作。他最好学会“成长”会自己努力的工作。 其实就是一堆任务,我们希望他们一直忙。给他 N 件事情,他一个一个自己做。 我们要给他一个目标,也就是 KPI 或者 OKR 随便吧,实际上这是一个队列对吧。我们用队列实现。

require 'thread'

class Worker
  attr :name
  def initialize(name)
    @name = "worker@#{name}"
    @queue = Queue.new
    @thr = Thread.new { perfom }
  end

  def <<(job)
    @queue.push(job)
  end

  def join
    @thr.join
  end

  def perfom
    while (job = @queue.deq)
      break if job == :done
      puts "worker@#{name}: job:#{job}"
      job.call
    end
  end

  def size
    @queue.size
  end
end

现在打工人变得充实了许多,他自从来了公司培训之后,就拥有了很多属性和方法。

  • 属性说明:

@queue 就是他的 OKR 清单,他必须完成所有的工作任务。

@thr 意思是 thread 缩写,这里是会使用一个线程来调用 perform 我们在用线程模拟打工人干活这件事。可以理解为 @thr 就是打工人的灵魂。

  • 方法说明:

<< 是一个 push 方法的语法糖,就给给自己的 OKR 里添加任务。

perform 可能要说下 perform 方法,这里是“运行”的意思哈,不是“表演” :P。 打工人怎么干活呢?这得说道说道。我们得指导他如何“成长”。

我们前面说了 @queue 就是他的 OKR, 他必须从自己的 OKR 中取出任务然后执行。这里我用了 job.call。 暗示,这必须是一个 callable 对象,在 ruby 里也就是拥有 call 方法的对象。可以是 lambda、或者实现 call 的。 这也很合理,需求必须能做才会做。没法做的需求,做不了就是做不了。

但是如果给了一个 :done 另说。循环会结束,这个线程会消失。(裁员了 :P)

def perfom
  while (job = @queue.deq)
    break if job == :done
    puts "worker@#{name}: job:#{job}"
    job.call
  end
end

其实 Queue 这个对象很有意思,Ruby 做了一些工作。Queue 在空的时候,虚拟机会让线程进入睡眠等待。如果队列里有任务,就会继续工作。Ruby 很贴心,果然是程序员的好朋友啊。其实我不知道其他语言什么样,懒得查了。

join 方法是一个 Thread 的线程方法,主要的作用是告诉主线程你要等待每一个子线程(自己)的完成。如果不写这句,主线程如果比所有子线程提前结束。那么子线程会被全部关闭。简而言之 join 就是同步等待线程结果。

让我们来看看 TDD:

我们可以加一段验证工号 ruby02 的打工人是不是如期的完成了工作。

# ....
  it "check worekr do sth job" do
    w = Worker.new("ruby02")

    finished = []
    w << lambda { puts "do job 1"; finished.push "job1"}
    w << lambda { puts "do job 2"; finished.push "job2"}
    w << :done
    w.join

    assert_equal finished, ["job1","job2"]
  end

# ....

其实到这里,一个合格的打工人就打造完毕了。打工人很简单,只要吃苦耐劳,一切都 OK。 下面我们要实现下 Workshop 公司类。

实现 公司 Workshop 类

在此之前,我们先实现:创业公司 MiniWorkshop 类

其实我打算过渡下,首先实现一个“创业公司” MiniWorkshop。 创业公司刚起步,一般是只有“打工人”,没有真正意义上的中层出现。 这一时期非常简单,伊甸园时期。有活大家一起干,大家都是兄弟。

class MiniWorkshop
  def initialize(count)
    @worker_count = count # 打工人数量
    @workers = @worker_count.times.map do |i| # 根据数量生成(招聘)打工人
      Worker.new(i) # 给个工号
    end
  end

  # 初创公司分配任务
  def <<(job)
    if job == :done
      @workers.map {|m| m << job}
    else
      # 随机选择一个打工人,接活
      @workers.sample << job
    end
  end

  def join
    @workers.map {|m| m.join}
  end
end

这里可能说下

def <<(job)
  if job == :done
    @workers.map {|m| m << job}
  else
    # 随机选择一个打工人,接活
    @workers.sample << job
  end
end

这里干活的模式可能不好,因为我们竟然 Array#sample 方式。这是一个随机方法。随机选择一个。 看似不合理,实际上也合情合理。

创业公司初期虽然是草根,可是大家哪个不是大佬。所以活来了谁都行,问题不大。

没事我们后面再改进好了。

TDD:

我们的单元测试其实描述了一个故事。一家创业公司,只有 2 个人。接到了一个订单是 4 个工作内容。

# ...
  it "check MiniWorkshop work" do
    ws = MiniWorkshop.new(2)

    finished = []
    ws << lambda { puts "job1"; finished.push "job1"}
    ws << lambda { puts "job2"; finished.push "job2"}
    ws << lambda { puts "job3"; finished.push "job3"}
    ws << lambda { puts "job4"; finished.push "job4"}
    ws << :done

    ws.join

    assert_equal finished.size, 4
  end
# ...

我们回过头再看 MiniWorkshop 类,初始化的时候创建了两个员工。任务来了就随机分配给一个员工。 很符合小作坊的模式。

实现上市公司

公司变大了,就不止 2 个员工了。可能四五百号,随机交给一个员工,不现实。中层管理出现。中层出现意味着我们公司的类也要进行改变,公司需要改革。

我们先实现一个改革之后的 Workshop 公司类。

class Workshop
  def initialize(count, master_name)
    @worker_count = count
    @workers = @worker_count.times.map do |i|
      Worker.new(i)
    end
    @master = Master.new(@workers) # 新增角色
  end

  def <<(job)
    if job == :done
      @workers.map {|m| m << job}
    else
      @master.assign(job) # master分配任务
    end
  end

  def join
    @workers.map {|m| m.join}
  end
end

可以看到,我们在初始化函数里新增了 @master 他接受 @workers 作为参数。毕竟领导要点兵啊。

<<方法也进行了改进,由以前的 直接让 @workers 接收任务,变成 @master.assign 分配任务。

让我们来看下 Master 类

class Master
  def initialize(workers)
    @workers = workers
  end

  def assign(job)
    @workers.sort{|a,b| a.size <=> b.size}.first << job
  end
end

其实也不复杂。我们保持了 @workers 的指针, assign 方法更像是把以前分配的逻辑接过来实现了一遍。

这次我们改了分配任务的方式,我们要根据 Worker#size 忙碌程度来分配任务。

毕竟嘛,领导有个方法论,会比小作坊高级很多。

多重领导

一个领导就足够了么?不。

现实中我们见过形形色色的领导,有的是自己培养,有的是留过洋,有的是大厂空降。他们拥有不同的“方法论”,也就是 Master#assign 的方式可能不同。

我们给公司再加两个领导。

无限方法论

996ICU 领导:

我们使用了 Array#cycle 的方式,这是一个迭代器。比如 [1,2,3].cycle 每次 .next 会产生 1、2、3、1、2、3、1、2、3 ..... 无限循环。

这个方法论就是 996 方法论,只要干不死就往死里干。人海战术,把人轮番填上。

class ICU996Master
  def initialize(workers)
    @current_worker = workers.cycle # 迭代器
  end

  def assign(job)
    @current_worker.next << job
  end
end

分组任务方法论

等我们的公司变大了,我们的业务也会变得丰富,任务不是那么单一。很多工作要添加上组别 group_id,分门别类的交给不同工种的打工人,比如 开发、产品、测试、设计、运营。


class GroupMaster
  GROUPS = [:group1, :group2, :group3]

  def initialize(workers)
    @workers = {}
    workers_per_group = workers.length / GROUPS.size
    workers.each_slice(workers_per_group).each_with_index do |slice, index|
      group_id = GROUPS[index]
      @workers[group_id] = slice
    end
  end

  def assign(job)
    worker = @workers[job.group].sort_by(&:size).first
    worker << job
  end
end

然后我们可以把不同风格的领导班子集中起来

Masters = {
  normal: NormalMaster,
  ICU996: ICU996Master,
  group: GroupMaster
}

我们改造下 Workshop 毕竟这个词是一个 工作室的意思,其实是个小部门。

我们改造之后,我们的小部门可以按照风格不同的领导进行分派工作。

class Workshop
  def initialize(count, master_name) # 新增 master_name 指定
    @worker_count = count
    @workers = @worker_count.times.map do |i|
      Worker.new(i)
    end
    # 匹配 master
    @master = Masters[master_name].new(@workers)
  end

  def <<(job)
    if job == :done
      @workers.map {|m| m << job}
    else
      @master.assign(job)
    end
  end

  def join
    @workers.map {|m| m.join}
  end
end

我们来看看不同部门的 TDD

it "check Workshop@ normal master" do
  ws = Workshop.new(4, :normal)

  finished = []
  ws << lambda { puts "job1"; finished.push "job1"}
  ws << lambda { puts "job2"; finished.push "job2"}
  ws << lambda { puts "job3"; finished.push "job3"}
  ws << lambda { puts "job4"; finished.push "job4"}
  ws << :done

  ws.join

  assert_equal finished.size, 4
end

it "check Workshop@ ICU996 master" do
  ws = Workshop.new(4, :ICU996)

  finished = []
  ws << lambda { puts "job1"; finished.push "job1"}
  ws << lambda { puts "job2"; finished.push "job2"}
  ws << lambda { puts "job3"; finished.push "job3"}
  ws << lambda { puts "job4"; finished.push "job4"}
  ws << :done

  ws.join

  assert_equal finished.size, 4
end

it "check Workshop@ group master" do
  ws = Workshop.new(4, :group)

  class GroupJob
    def initialize(group_id, &b)
      @group_id = group_id
      @blk = b
    end

    # 任务分组
    def group
      "group#{@group_id}".to_sym
    end

    def call

      @blk.call(@group_id)
    end
  end

  finished = []
  ws << GroupJob.new(1) { |group_id| finished.push(group_id)}
  ws << GroupJob.new(2) { |group_id| finished.push(group_id)}
  ws << GroupJob.new(3) { |group_id| finished.push(group_id)}
  ws << GroupJob.new(1) { |group_id| finished.push(group_id)}
  ws << :done

  ws.join

  assert_equal finished.size, 4
end

总结 Master-Worker 模式

好吧,戏说不是胡说,改编不是乱编。

我们从现实的故事中走出来。

  • 调度器 (Scheduler)

其实在这里 Master 类,可能会被叫做 Scheduler 即调度器。内部的方法主要是使用不同的策略来分配任务。

而不同的 Master 实现的 assign 方法就是 调度策略。

  • 线程池 (Thread Pool)

Workshop 其实 持有 @workers,也就是说汇聚了实际工作线程的对象。他们可能会有另一个名字 —— 线程池(Thread Pool)

故事讲完了,你有没有学会呢? :D

示例代码:

参考资料:

好!要是有实际应用场景的例子就更好了

单纯的代码创不了业😂

hellonunam 回复

可以直接用。比如你有 10 个爬虫任务,丢给这个类。完全可以使用。并不是玩具代码。 :D

这篇文章真有意思,看的过程中有好几个地方都笑了,竟然还可以这样玩,虽然做过人力,也没这样思考过。

需要 登录 后方可回复, 如果你还没有账号请 注册新账号