回顾
之前我们介绍了使用分布式事务(XA)处理用户下订单,对MySQL有所了解的都知道XA其实是在非用不可的情况下才用的,因为它实在是影响性能。当然,其实迫使我们使用XA的原因也是因为我们的设计决定的(其实使用XA这种分库分表的方案基本能满足那些一定需要强一致性的需求)。
之前我们的设计是为了方便卖家的,所以完整的订单信息只保存在卖家方,而在买家方我们只保存着完整订单的引用。这样做是因为业务需要强一致性,迫使我们使用XA,但我们又希望让数据写磁盘少一点,让插入快一点。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
我们想要的
无论在买家还是卖家在操作订单数据的时候都能方便。为了满足这样的情况,我们只能做到数据冗余了。就是买家有一个完整的数据卖家也有一份完整的数据。这样操作起来就能在单机上进行,这样是最方便的。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
业务最终一致性
其实,往往再深入分析业务,可以发现其实业务上并非一定需要强一致性。我们的目的只要买家已经下的订单能完整让卖家看到,卖家多久能看到其实并不是很关心。因为如果卖家没看到订单也不会对订单进行操作,也不会发货给买家,这样卖家是不会有损失的。而买家就算是付了款,发现买家没发货,也可以退款。这样一来买家也没有金钱的损失。所以我们这边能使用最终一致性来解决问题。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
为什么要最终一致性
说白了就是为了提高性能,因为我们现在需要买家和卖家都有完整的订单数据,为了让买家下单时不用跨库、跨实例或跨节点执行XA事务。这样我们保证买家的订单数据先入库后能马上返回成功信息。而不用关心卖家的数据是否入库,只是提醒卖家有订单信息要入库了。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
kafka配合完成最终一致性
要完成最终一致性这种业务,我最先想到的就是使用消息队列了。而在消息队列中我最熟悉的只能是kafka了,我使用kafka的原因是他能高可用,还能将消息持久化。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
下面我们就来演示使用kafka来完成最终一致性文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
重构买家订单表(buy_order_x)结构:文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
-- 在每个分库为 buy_order_x 添加price列 ALTER TABLE buy_order_1 ADD `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格'; ALTER TABLE buy_order_2 ADD `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格'; ALTER TABLE buy_order_3 ADD `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格'; ALTER TABLE buy_order_4 ADD `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格'; ALTER TABLE buy_order_5 ADD `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格'; ALTER TABLE buy_order_6 ADD `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格'; ALTER TABLE buy_order_7 ADD `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格'; ALTER TABLE buy_order_8 ADD `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格'; ALTER TABLE buy_order_9 ADD `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格'; ALTER TABLE buy_order_10 ADD `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格'; -- 在每个分库为 buy_order_x 添加status列 ALTER TABLE buy_order_1 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态'; ALTER TABLE buy_order_2 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态'; ALTER TABLE buy_order_3 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态'; ALTER TABLE buy_order_4 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态'; ALTER TABLE buy_order_5 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态'; ALTER TABLE buy_order_6 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态'; ALTER TABLE buy_order_7 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态'; ALTER TABLE buy_order_8 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态'; ALTER TABLE buy_order_9 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态'; ALTER TABLE buy_order_10 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态'; -- 最终表结构 CREATE TABLE `buy_order_1` ( `buy_order_id` bigint(20) unsigned NOT NULL COMMENT '出售订单ID与出售订单相等', `user_id` int(10) unsigned DEFAULT NULL COMMENT '下单用户ID', `user_guide_id` int(10) unsigned DEFAULT NULL COMMENT '导购ID', `price` decimal(11,2) DEFAULT NULL COMMENT '订单价格', `status` tinyint(3) unsigned DEFAULT NULL COMMENT '订单状态', PRIMARY KEY (`buy_order_id`), KEY `idx$buy_order_1$user_id` (`user_id`), KEY `idx$buy_order_1user_guide_id` (`user_guide_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
为买家创建订单商品表:文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
-- 在每一个库都进行添加多个buy_order_goods_x表 -- 下面表buy_order_goods_x中的x代表的是 1,2,3,4,5,6,7,8,9,10 CREATE TABLE `buy_order_goods_x` ( `order_goods_id` bigint(20) unsigned NOT NULL COMMENT '订单商品表ID', `user_id` int(10) unsigned DEFAULT NULL COMMENT '下单用户ID', `sell_order_id` bigint(20) unsigned NOT NULL COMMENT '订单ID', `goods_id` bigint(20) unsigned NOT NULL COMMENT '商品ID', `user_guide_id` int(10) unsigned DEFAULT NULL COMMENT '导购ID', `price` decimal(11,2) DEFAULT NULL COMMENT '商品购买价格价格', `num` tinyint(3) unsigned DEFAULT NULL COMMENT '商品数量', PRIMARY KEY (`order_goods_id`), KEY `idx$order_goods$orders_id` (`sell_order_id`), KEY `idx$order_goods$user_id` (`user_id`), KEY `idx$order_goods$goods_id` (`goods_id`), KEY `idx$order_goods$user_guide_id` (`user_guide_id`) ); -- 将buy_order_goods加入系统配置中表明该表是分表。 INSERT INTO system_setting VALUES(NULL, 'sharding_table', 'buy_order_goods'); -- 设置buy_order_goods分表的关键字 INSERT INTO system_setting VALUES(NULL, 'sharding_buy_order_goods_by', 'user_id'); -- 指定每种角色需要访问的表 INSERT INTO system_setting VALUES(NULL, 'normal_user_sharding', 'buy_order_goods'); INSERT INTO system_setting VALUES(NULL, 'user_guide_sharding', 'buy_order_goods'); INSERT INTO system_setting VALUES(NULL, 'store_owner_sharding', 'buy_order_goods');
这边我不关心kafka是如何搭建的,就直接使用了。要想知道如何搭建kafka请点击一下网址:http://www.linuxidc.com/Linux/2014-07/104470.htm文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
买家下订单演示:文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
注意这边python代码使用的是simplejson因此需要安装。安装包simplejson-master文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
#!/usr/bin/env python # -*- coding:utf-8 -*- # Program: 客户下订单+kafka 订单推送 # Author : HH # Date : 2016-02-27 import sys import mysql.connector import time import simplejson as json import snowflake.client from pykafka import KafkaClient reload(sys) sys.setdefaultencoding('utf-8') if __name__ == '__main__': ''' 这边模拟用户:username77购买 store2 中的 goods27和goods69商品 ''' # 设置公共库连接配置 db_config_common = { 'user' : 'root', 'password': 'root', 'host' : '127.0.0.1', 'port' : 3306, 'database': 'test' } # 设置snowflake链接默认参数 snowflake_config = { 'host': '192.168.137.11', 'port': 30001 } # 配置snowflake snowflake.client.setup(**snowflake_config) # 配置连接kafka client = KafkaClient(hosts="192.168.137.11:9092, \ 192.168.137.11:9093, \ 192.168.137.11:9094") topic = client.topics['shop'] # 创建生产者 producer = topic.get_producer(delivery_reports=False) # 获得公共数据库的链接和游标 conn_common = mysql.connector.connect(**db_config_common) cursor_select_common = conn_common.cursor(buffered=True) # 获得用户:username77的基本信息 select_sql = ''' SELECT u.user_id, u.table_flag, u.db_name, ss.value FROM user AS u LEFT JOIN system_setting AS ss ON u.db_name = ss.name WHERE username = 'username77' ''' cursor_select_common.execute(select_sql) buy_user_id, buy_table_flag, buy_db_name, buy_db_config_json \ = cursor_select_common.fetchone() # 获得购买者的链接和游标 conn_buy = mysql.connector.connect(**json.loads(buy_db_config_json)) cursor_select_buy = conn_buy.cursor(buffered=True) cursor_dml_buy = conn_buy.cursor(buffered=True) # 通过店铺名称获得导购以及导购所对应的用户所使用的数据库链接描述符 select_sql = ''' SELECT s.user_id, ug.user_guide_id, u.table_flag, u.db_name, ss.value AS db_config_json FROM store AS s LEFT JOIN user AS u USING(user_id) LEFT JOIN user_guide AS ug USING(user_id) LEFT JOIN system_setting AS ss ON ss.name = u.db_name WHERE s.user_id = 2 ''' cursor_select_common.execute(select_sql) sell_user_id, user_guide_id, sell_table_flag, sell_dbname, \ sell_db_config_json = cursor_select_common.fetchone() # 获得出售者的数据库链接描述符以及游标 conn_sell = mysql.connector.connect(**json.loads(sell_db_config_json)) cursor_select_sell = conn_sell.cursor(buffered=True) # 成订单ID order_id = snowflake.client.get_guid() # 获得商品信息并生成商品订单信息。 select_goods_sql = ''' SELECT goods_id, price FROM {goods_table} WHERE goods_id IN(3794292584748158977, 3794292585729626113) '''.format(goods_table = 'goods_' + str(sell_table_flag)) cursor_select_sell.execute(select_goods_sql) # 订单价格 order_price = 0 for goods_id, price in cursor_select_sell: # 生成订单货物ID guid = snowflake.client.get_guid() order_price += price # 生成订单商品信息 insert_order_goods_sql = ''' INSERT INTO {table_name} VALUES({guid}, {user_id}, {order_id}, {goods_id}, {user_guide_id}, {price}, 1) '''.format(table_name = 'buy_order_goods_' + str(buy_table_flag), guid = guid, user_id = buy_user_id, order_id = order_id, goods_id = goods_id, user_guide_id = user_guide_id, price = price) cursor_dml_buy.execute(insert_order_goods_sql) # 将订单商品信息放入队列中 goods_dict = { 'user_id' : sell_user_id, 'option_type' : 'insert', 'option_table': 'order_goods', 'option_obj' : { 'order_goods_id': guid, 'sell_order_id' : order_id, 'goods_id' : goods_id, 'user_guide_id' : user_guide_id, 'price' : price, 'num' : 1 } } goods_json = json.dumps(goods_dict) producer.produce(goods_json) # 生成订单记录 insert_order_sql = ''' INSERT INTO {order_table} VALUES({order_id}, {user_id}, {user_guide_id}, {price}, 0) '''.format(order_table = 'buy_order_' + str(buy_table_flag), order_id = order_id, user_id = buy_user_id, user_guide_id = user_guide_id, price = order_price) cursor_dml_buy.execute(insert_order_sql) # 将订单信息放入队列中 order_dict = { 'user_id' : sell_user_id, 'option_type' : 'insert', 'option_table': 'sell_order', 'option_obj' : { 'sell_order_id' : order_id, 'user_guide_id' : user_guide_id, 'user_id' : buy_user_id, 'price' : order_price, 'status' : 0 } } order_json = json.dumps(order_dict) producer.produce(order_json) producer.stop() # 提交事物 conn_buy.commit() # 关闭有标链接 cursor_select_common.close() cursor_select_buy.close() cursor_select_sell.close() cursor_dml_buy.close() conn_common.close() conn_buy.close() conn_sell.close()
生成卖家订单信息:文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
通过上面买家下订单,现在订单的信息在队列中。我们只需要将队列中的数据取出并保存就好了。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
#!/usr/bin/env python # -*- coding:utf-8 -*- # Program: 客户下订单+kafka 订单推送 # Author : HH # Date : 2016-02-27 import sys import mysql.connector import time import simplejson as json from pykafka import KafkaClient reload(sys) sys.setdefaultencoding('utf-8') if __name__ == '__main__': ''' 获取队列数据保存到数据库 ''' # 设置公共库连接配置 db_config_common = { 'user' : 'root', 'password': 'root', 'host' : '127.0.0.1', 'port' : 3306, 'database': 'test' } # 配置snowflake snowflake.client.setup(**snowflake_config) # 配置连接kafka client = KafkaClient(hosts="192.168.137.11:9092, \ 192.168.137.11:9093, \ 192.168.137.11:9094") topic = client.topics['shop'] # 生成消费者 balanced_consumer = topic.get_balanced_consumer( consumer_group='goods_group', auto_commit_enable=True, zookeeper_connect='localhost:2181' ) # 获得公共数据库的链接和游标 conn_common = mysql.connector.connect(**db_config_common) cursor_select_common = conn_common.cursor(buffered=True) # 消费信息 for message in balanced_consumer: if message is not None: # 解析json为dict info_dict = json.loads(message.value) select_sql = ''' SELECT s.user_id, ug.user_guide_id, u.table_flag, u.db_name, ss.value AS db_config_json FROM store AS s LEFT JOIN user AS u USING(user_id) LEFT JOIN user_guide AS ug USING(user_id) LEFT JOIN system_setting AS ss ON ss.name = u.db_name WHERE s.user_id = {user_id} '''.format(user_id=info_dict['user_id']) cursor_select_common.execute(select_sql) sell_user_id, user_guide_id, sell_table_flag, sell_dbname, \ sell_db_config_json = cursor_select_common.fetchone() # 获得出售者的数据库链接描述符以及游标 conn_sell = mysql.connector.connect(**json.loads(sell_db_config_json)) cursor_dml_sell = conn_sell.cursor(buffered=True) if info_dict['option_type'] == 'insert': # 构造insert sql语句 if info_dict['option_table'] == 'order_goods': insert_sql = ''' INSERT INTO {table_name}_{flag} VALUES( {order_goods_id}, {sell_order_id}, {goods_id}, {user_guide_id}, {price}, {num} ) '''.format(table_name = info_dict['option_table'], flag = sell_table_flag, **info_dict['option_obj'] ) elif info_dict['option_table'] == 'sell_order': insert_sql = ''' INSERT INTO {table_name}_{flag} VALUES( {sell_order_id}, {user_guide_id}, {user_id}, {price}, {status} ) '''.format(table_name = info_dict['option_table'], flag = sell_table_flag, **info_dict['option_obj'] ) # 执行sql cursor_dml_sell.execute(insert_sql) conn_sell.commit() cursor_dml_sell.close() conn_sell.close() elif info_dict['option_type'] == 'update': pass
源码:order_create_script文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
昵称:HH文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
QQ:275258836文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
ttlsa群交流沟通(QQ群②:6690706 QQ群③:168085569 QQ群④:415230207(新) 微信公众号:ttlsacom)文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
感觉本文内容不错,读后有收获?文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/
逛逛衣服店,鼓励作者写出更好文章。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-distributed-database-and-table-give-and-get-performance/

评论