瞎扯淡 Let's clone a Message Queue

hooopo · 2019年03月24日 · 最后由 hooopo 回复于 2019年03月26日 · 1155 次阅读

我们都知道,消息队列有三好:异步、解耦、消峰。解耦和消峰的话,要访问量比较大的应用才需要,但异步几乎所有规模的应用都有需求,即使是一个小论坛,注册发邮件一般都是异步处理的。

市面上的的流行消息队列方向更多侧重于性能和吞吐量,但一些需求上的特性都不重视,比如kafka和rabbitmq都不支持“定时消息”,即,如果想实现“五分钟之后做某事”这样的需求,用kafka和rabbitmq是不容易实现的。

Rails社区流行的消息队列主要是sidekiq,局限于Ruby。Sidekiq由于使用Redis做存储,速度还算不错,但由于和redis绑定紧密,带来一些功能上的限制,比如持久化消息不容易做,所以sidekiq的消息是用完即失。对于一些场景,持久化消息其实是非常有必要的,比如审计、追踪、重新消费等。

最近产生了用postgres实现一个消息队列的想法,结合之前使用的需求,大概列了一下功能点,感觉用postgres实现起来性能也不会太差。

  • [x] multiple named queues
  • [x] exactly once
  • [x] priorities
  • [x] delayed jobs
  • [x] persistent jobs
  • [x] retries with backoff
  • [ ] cron job
  • [ ] broadcast msg to multiple queues
  • [ ] job dependencies
  • [ ] rate limiting
  • [ ] unique jobs
  • [ ] expire jobs
  • [ ] concurrent num & priority by tenant for SaaS
  • [ ] statistics & web ui
  • [ ] fast re-queue
  • [ ] distributed workers
  • [ ] batch processing

于是就有了这个pgmq:https://github.com/hooopo/pgmq

worker是基于faktory魔改的,faktory已经把多线程调度之类做好了,没必要再费事。Broker是基于Postgres,不依赖Redis or Faktory server,如果你现有的应用就是pg可以使用同一个库,也可以使用独立的库。

本机测试了一下,一分钟可以处理10w+,还有一些优化空间。

由于是基于Postgres的,上面的这些需求实现起来非常简单,并且非常透明,因为数据都在table里,不像sidekiq和faktory还有自己的协议。

pgmq不限制语言,所以看一下表结构就可以用其他语言撸客户端和web UI了。

PS. 目前只是把demo跑起来了,super alpha,不要生产用,如有需要请留言。

共收到 24 条回复

炮哥 有个基于PG的异步任务:https://github.com/chanks/que

A Ruby job queue that uses PostgreSQL's advisory locks for speed and reliability.

rubyist518 回复
  1. 实现机制不一样,que使用的是advisory locks,pgmq使用的是PostgreSQL 9.5's SKIP LOCKED feature.
  2. que的实现是有问题的,参考:https://brandur.org/postgres-queues
  3. 其实pgmq的目标不止是做background job,上面列表里的功能有一些只有sidekiq pro才提供,但其实对于一些应用非常有必要
  4. 还有就是pgmq可以其他语言使用的,不局限在Ruby上

从此最小化的 Rails 应用架构仅需要 Rails App server 和 PostgreSQL

炮哥不介绍下一delayed_job和你这个的对比吗?

jasl 回复

delayed_job已经不支持rails 5了吧,跑不起来...和DJ比有点欺负人啊,毕竟DJ是上个世纪的东西了。pgmq真的非常有意思,这个坑够填一阵子了

hooopo 回复

好像被另一个人接盘了,不过 DJ 连 AJ 的协议都没支持全的...

让我们来看看你这个坑能不能填完 😆

有苗不愁长,有坑不怕填! 😃

jasl 回复

感觉ActiveJob没啥用

hooopo 回复

AJ 是个标准,接口和实践分离是个很提倡的工程实践

huobazi 回复

新玩具~~

hooopo 回复

弄个适配器

hooopo 回复

ActiveJob 可以方便已经用上的人无痛迁移,Delay Mailer 和 ActiveStorage 这些组件逻辑不用重写。

我也疑惑为了队列弄个 redis 有无必要,单纯缓存用途用 memcache 更好。用 db 存队列的一个顾虑是,如果有大量临时的任务信息,处理完就删除,会不会影响 db 的性能?因为好像有些 db 删除数据也不释放空间的,由于碎片问题删除也没意义。

Rei 回复

pg得定时做垃圾回收。

Rei 回复

缓存用memcache更好,无语了😓

19楼 已删除
20楼 已删除
Rei 回复

Auto Vacuum是可以定期清理的,只是磁盘空间会占用,不会对查询影响:https://www.postgresql.org/docs/9.5/routine-vacuuming.html

我们项目因为历史原因,在使用 delayed_job ,db 做 Queue 系统,锁是个很大问题。

  1. 多个 worker 在拿任务时,会有互斥。
  2. 拿任务和写入任务会互斥

借用一篇文章的标题:《Databases suck for Messaging》

https://www.rabbitmq.com/resources/RabbitMQ_Oxford_Geek_Night.pdf


迫于无奈,我们最终把 delayed_job 的后端迁移到 RabbitMQ 上。

xiaoronglv 回复

pg 9.5和mysql8之后有了skip locked,锁不是一个大问题。但不同的消息队列的功能特性差异很大,例如:

所以,如果你的应用场景需要 持久化+延迟队列+优先级,那么rabbitmq和kafka都满足不了,这些逻辑只能要应用层去实现。

db做队列性能一定不如流行的mq,但也有其他优势。

rabbitmq支持延迟,持久化,和优先队列啊

mengqing 回复

那再加一个mutate和re-consuming😂

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册