定时消息通知

HH MySQL python定时消息通知已关闭评论20,6755字数 1911阅读6分22秒阅读模式

1.1. 背景

系统的有些业务时需要定时发消息通知。但是这些消息又不是有规律可循的。比如,商品的优惠是限时的。在之前的实现是有一个排查任务每5分钟都去去商品表中查询哪些有做活动的商品,并比较是否过了限时折扣的时间。但是类似的排程多了,就会出现在某个时候数据库的资源使用率特别高。

1.2. 解决思路

1、将参与限时活动的商品保存在另外一张表。文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

2、使用消息队列机制,选择限时商品的时候将商品信息和限时的时间传入消息队列。文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

3、创建一个定时任务。文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

4、当时间到了定时任务就将在限时商品表删除此商品。文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

1.3. 定时消息任务的实现

这边就不去操作数据库了,就演示一下要如何实现这样的定时任务。同时也不演示kafka是如何搭建的,这边就直接用起来。文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

生产者代码文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

生产者代码主要实现了将 商品信息、数据库链接信息、定时时间传入kafka中。文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

#!/usr/bin/env python
# -*- coding:utf-8 -*-
  
from pykafka import KafkaClient
import simplejson as json
import logging
import time
import sys
  
reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig()
 
if __name__ == '__main__':
  # 可接受多个Client这是重点
  client = KafkaClient(hosts="192.168.137.12:9092")
   
  # 选择一个topic
  topic = client.topics['test']
   
  # 创建一个生产者
  producer = topic.get_producer()
  producer.start()
   
  # 生产消息
  msg_dict = {
    "sleep_time": 10,
    "db_config" : {
      "database"  : "test",
      "host"      : "192.168.137.12",
      "user"      : "root",
      "password"  : "root"
    },
    "table"     : "msg",
    "msg"       : "Hello World"
  }
  msg = json.dumps(msg_dict)
  producer.produce(msg)
  producer.stop()

消费者代码文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

消费者代码中主要是实现了将接收的消息放入定时任务中(timer)。文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

#!/usr/bin/env python
# -*- coding:utf-8 -*-
  
from pykafka.common import OffsetType
from pykafka import KafkaClient
from threading import Timer
import simplejson as json
import logging
import sys
  
reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig()
 
def func(msg):
  '''
  定时执行的任务
  '''
  print msg
   
 
if __name__ == '__main__':
  # 可接受多个Client这是重点
  client = KafkaClient(hosts="192.168.137.12:9092")
   
  # 查看所有topic
  client.topics
   
  # 选择一个topic
  topic = client.topics['test']
   
  # 使用这种一个topic只能允许一个consumer_group消费
  balanced_consumer = topic.get_balanced_consumer(
    consumer_group='test_group1',
    auto_commit_enable=True,
    zookeeper_connect='localhost:2181'
  )
   
  for message in balanced_consumer:
if message is not None:
  # 创建定时任务
      timer = Timer(10, func, args=[message.value])
      timer.start()

1.4. 补充

之前我说过消息信息需要存入数据库,这边的原因是主要是怕这个消费者程序奔溃重启是还能恢复。还有其实我们应该使用zookeeper开发分布式程序。当一个消费者程序崩溃了另外一个需要马上接进来(有兴趣的可以去研究一下kazoo,并实现分布式程序和leader选举)。文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

 文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

昵称:HH
QQ:275258836
ttlsa群交流沟通(QQ群②:6690706 QQ群③:168085569 QQ群④:415230207(新) 微信公众号:ttlsacom)文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

感觉本文内容不错,读后有收获?文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

逛逛衣服店,鼓励作者写出更好文章。文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/ 文章源自运维生存时间-https://www.ttlsa.com/mysql/python-timer-notify/

weinxin
我的微信
微信公众号
扫一扫关注运维生存时间公众号,获取最新技术文章~
HH
  • 本文由 发表于 23/08/2016 00:25:04
  • 转载请务必保留本文链接:https://www.ttlsa.com/mysql/python-timer-notify/