瞎扯淡 Let's clone a Message Queue

hooopo · 2019年03月24日 · 最后由 hooopo 回复于 2019年05月14日 · 2921 次阅读

我们都知道,消息队列有三好:异步、解耦、消峰。解耦和消峰的话,要访问量比较大的应用才需要,但异步几乎所有规模的应用都有需求,即使是一个小论坛,注册发邮件一般都是异步处理的。

市面上的的流行消息队列方向更多侧重于性能和吞吐量,但一些需求上的特性都不重视,比如 kafka 和 rabbitmq 都不支持“定时消息”,即,如果想实现“五分钟之后做某事”这样的需求,用 kafka 和 rabbitmq 是不容易实现的。

Rails 社区流行的消息队列主要是 sidekiq,局限于 Ruby。Sidekiq 由于使用 Redis 做存储,速度还算不错,但由于和 redis 绑定紧密,带来一些功能上的限制,比如持久化消息不容易做,所以 sidekiq 的消息是用完即失。对于一些场景,持久化消息其实是非常有必要的,比如审计、追踪、重新消费等。

最近产生了用 postgres 实现一个消息队列的想法,结合之前使用的需求,大概列了一下功能点,感觉用 postgres 实现起来性能也不会太差。

  • [x] multiple named queues
  • [x] exactly once
  • [x] priorities
  • [x] delayed jobs
  • [x] persistent jobs
  • [x] retries with backoff
  • [ ] cron job
  • [ ] broadcast msg to multiple queues
  • [ ] job dependencies
  • [ ] rate limiting
  • [ ] unique jobs
  • [ ] expire jobs
  • [ ] concurrent num & priority by tenant for SaaS
  • [ ] statistics & web ui
  • [ ] fast re-queue
  • [ ] distributed workers
  • [ ] batch processing

于是就有了这个 pgmq:https://github.com/hooopo/pgmq

worker 是基于 faktory 魔改的,faktory 已经把多线程调度之类做好了,没必要再费事。Broker 是基于 Postgres,不依赖 Redis or Faktory server,如果你现有的应用就是 pg 可以使用同一个库,也可以使用独立的库。

本机测试了一下,一分钟可以处理 10w+,还有一些优化空间。

由于是基于 Postgres 的,上面的这些需求实现起来非常简单,并且非常透明,因为数据都在 table 里,不像 sidekiq 和 faktory 还有自己的协议。

pgmq 不限制语言,所以看一下表结构就可以用其他语言撸客户端和 web UI 了。

PS. 目前只是把 demo 跑起来了,super alpha,不要生产用,如有需要请留言。

炮哥 有个基于 PG 的异步任务:https://github.com/chanks/que

A Ruby job queue that uses PostgreSQL's advisory locks for speed and reliability.

rubyist518 回复
  1. 实现机制不一样,que 使用的是 advisory locks,pgmq 使用的是 PostgreSQL 9.5's SKIP LOCKED feature.
  2. que 的实现是有问题的,参考:https://brandur.org/postgres-queues
  3. 其实 pgmq 的目标不止是做 background job,上面列表里的功能有一些只有 sidekiq pro 才提供,但其实对于一些应用非常有必要
  4. 还有就是 pgmq 可以其他语言使用的,不局限在 Ruby 上

从此最小化的 Rails 应用架构仅需要 Rails App server 和 PostgreSQL

炮哥不介绍下一 delayed_job 和你这个的对比吗?

jasl 回复

delayed_job 已经不支持 rails 5 了吧,跑不起来...和 DJ 比有点欺负人啊,毕竟 DJ 是上个世纪的东西了。pgmq 真的非常有意思,这个坑够填一阵子了

hooopo 回复

好像被另一个人接盘了,不过 DJ 连 AJ 的协议都没支持全的...

让我们来看看你这个坑能不能填完 😆

有苗不愁长,有坑不怕填! 😃

jasl 回复

感觉 ActiveJob 没啥用

hooopo 回复

AJ 是个标准,接口和实践分离是个很提倡的工程实践

huobazi 回复

新玩具~~

hooopo 回复

弄个适配器

hooopo 回复

ActiveJob 可以方便已经用上的人无痛迁移,Delay Mailer 和 ActiveStorage 这些组件逻辑不用重写。

我也疑惑为了队列弄个 redis 有无必要,单纯缓存用途用 memcache 更好。用 db 存队列的一个顾虑是,如果有大量临时的任务信息,处理完就删除,会不会影响 db 的性能?因为好像有些 db 删除数据也不释放空间的,由于碎片问题删除也没意义。

Rei 回复

pg 得定时做垃圾回收。

Rei 回复

缓存用 memcache 更好,无语了😓

19 楼 已删除
20 楼 已删除
Rei 回复

Auto Vacuum 是可以定期清理的,只是磁盘空间会占用,不会对查询影响:https://www.postgresql.org/docs/9.5/routine-vacuuming.html

我们项目因为历史原因,在使用 delayed_job,db 做 Queue 系统,锁是个很大问题。

  1. 多个 worker 在拿任务时,会有互斥。
  2. 拿任务和写入任务会互斥

借用一篇文章的标题:《Databases suck for Messaging》

https://www.rabbitmq.com/resources/RabbitMQ_Oxford_Geek_Night.pdf


迫于无奈,我们最终把 delayed_job 的后端迁移到 RabbitMQ 上。

xiaoronglv 回复

pg 9.5 和 mysql8 之后有了 skip locked,锁不是一个大问题。但不同的消息队列的功能特性差异很大,例如:

所以,如果你的应用场景需要 持久化 + 延迟队列 + 优先级,那么 rabbitmq 和 kafka 都满足不了,这些逻辑只能要应用层去实现。

db 做队列性能一定不如流行的 mq,但也有其他优势。

rabbitmq 支持延迟,持久化,和优先队列啊

mengqing 回复

那再加一个 mutate 和 re-consuming😂

huobazi 回复

looks good.

比起一般消息队列来,支持持久化(顺便支持了消息积压),这个用处就比较大了 不过削峰用数据库不合适吧,现在大家做东西的原则不都是“尽量靠缓存,没事别去烦数据库”么 😂

southwolf 回复

就是缓冲一下咯,如果简单的 msg 插入也是瓶颈的话,确实不适合。其实 pgmq 可以使用 multi node 模式的,也没问题。

需要 登录 后方可回复, 如果你还没有账号请 注册新账号