- A+
1.1. 背景
系统的有些业务时需要定时发消息通知。但是这些消息又不是有规律可循的。比如,商品的优惠是限时的。在之前的实现是有一个排查任务每5分钟都去去商品表中查询哪些有做活动的商品,并比较是否过了限时折扣的时间。但是类似的排程多了,就会出现在某个时候数据库的资源使用率特别高。
1.2. 解决思路
1、将参与限时活动的商品保存在另外一张表。
2、使用消息队列机制,选择限时商品的时候将商品信息和限时的时间传入消息队列。
3、创建一个定时任务。
4、当时间到了定时任务就将在限时商品表删除此商品。
1.3. 定时消息任务的实现
这边就不去操作数据库了,就演示一下要如何实现这样的定时任务。同时也不演示kafka是如何搭建的,这边就直接用起来。
生产者代码
生产者代码主要实现了将 商品信息、数据库链接信息、定时时间传入kafka中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
#!/usr/bin/env python # -*- coding:utf-8 -*- from pykafka import KafkaClient import simplejson as json import logging import time import sys reload(sys) sys.setdefaultencoding('utf-8') logging.basicConfig() if __name__ == '__main__': # 可接受多个Client这是重点 client = KafkaClient(hosts="192.168.137.12:9092") # 选择一个topic topic = client.topics['test'] # 创建一个生产者 producer = topic.get_producer() producer.start() # 生产消息 msg_dict = { "sleep_time": 10, "db_config" : { "database" : "test", "host" : "192.168.137.12", "user" : "root", "password" : "root" }, "table" : "msg", "msg" : "Hello World" } msg = json.dumps(msg_dict) producer.produce(msg) producer.stop() |
消费者代码
消费者代码中主要是实现了将接收的消息放入定时任务中(timer)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
#!/usr/bin/env python # -*- coding:utf-8 -*- from pykafka.common import OffsetType from pykafka import KafkaClient from threading import Timer import simplejson as json import logging import sys reload(sys) sys.setdefaultencoding('utf-8') logging.basicConfig() def func(msg): ''' 定时执行的任务 ''' print msg if __name__ == '__main__': # 可接受多个Client这是重点 client = KafkaClient(hosts="192.168.137.12:9092") # 查看所有topic client.topics # 选择一个topic topic = client.topics['test'] # 使用这种一个topic只能允许一个consumer_group消费 balanced_consumer = topic.get_balanced_consumer( consumer_group='test_group1', auto_commit_enable=True, zookeeper_connect='localhost:2181' ) for message in balanced_consumer: if message is not None: # 创建定时任务 timer = Timer(10, func, args=[message.value]) timer.start() |
1.4. 补充
之前我说过消息信息需要存入数据库,这边的原因是主要是怕这个消费者程序奔溃重启是还能恢复。还有其实我们应该使用zookeeper开发分布式程序。当一个消费者程序崩溃了另外一个需要马上接进来(有兴趣的可以去研究一下kazoo,并实现分布式程序和leader选举)。
昵称:HH
QQ:275258836
ttlsa群交流沟通(QQ群②:6690706 QQ群③:168085569 QQ群④:415230207(新) 微信公众号:ttlsacom)
感觉本文内容不错,读后有收获?

微信公众号
扫一扫关注运维生存时间公众号,获取最新技术文章~