- A+
所属分类:python
SQLAlchemy是Python编程语言下的一款开源软件。提供了SQL工具包及对象关系映射(ORM)工具,使用MIT许可证发行。
SQLAlchemy“采用简单的Python语言,为高效和高性能的数据库访问设计,实现了完整的企业级持久模型”。SQLAlchemy的理念是,SQL数据库的量级和性能重要于对象集合;而对象集合的抽象又重要于表和行。因此,SQLAlchmey采用了类似于Java里Hibernate的数据映射模型,而不是其他ORM框架采用的Active Record模型。不过,Elixir和declarative等可选插件可以让用户使用声明语法。
SQLAlchemy首次发行于2006年2月,并迅速地在Python社区中最广泛使用的ORM工具之一,不亚于Django的ORM框架。
1. 安装
1 2 |
# pip install sqlalchemy # pip install pymysql |
2. 创建引擎
1 2 |
>>> from sqlalchemy import create_engine >>> engine = create_engine('mysql+pymysql://USERNAME:PASSWORD@DB_HOST:DB_PORT/DB_NAME', pool_recycle=3600) |
pool_recycle 指定连接池收回时间。
引擎初始化后,就可以连接数据库了。
3. 连接数据库
1 |
>>> connection = engine.connect() |
4. metadata
1 2 |
>>> from sqlalchemy import MetaData >>> metadata = MetaData() |
5. 定义表
1 2 3 4 5 6 7 8 9 10 11 |
>>> from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey, DateTime >>> from datetime import datetime >>> users = Table('users', metadata, ... Column('user_id', Integer(), primary_key=True), ... Column('username', String(15), nullable=False, unique=True), ... Column('email_address', String(255), nullable=False), ... Column('phone', String(20), nullable=False), ... Column('password', String(25), nullable=False), ... Column('created_on', DateTime(), default=datetime.now), ... Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now) ... ) |
6. 创建表
1 |
>>> metadata.create_all(engine) |
7. 插入数据
1 2 3 4 5 6 7 8 9 10 11 |
>>> ins = users.insert().values( ... user_id=8888, ... username='ttlsa.com', ... email_address = 'support@ttlsa.com', ... phone = 12345678901, ... password = 'www.ttlsa.com' ... ) >>> print ins INSERT INTO users (user_id, username, email_address, phone, password, created_on, updated_on) VALUES (:user_id, :username, :email_address, :phone, :password, :created_on, :updated_on) >>> ins.compile().params {'username': 'ttlsa.com', 'user_id': 8888, 'phone': 12345678901, 'created_on': None, 'updated_on': None, 'password': 'www.ttlsa.com', 'email_address': 'support@ttlsa.com'} |
8. 执行
1 2 3 |
>>> result = connection.execute(ins) >>> result.inserted_primary_key [8888] |
9. 插入多条
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
>>> ins = users.insert() >>> multi_data = [ ... { ... 'user_id':1, ... 'username':'u1', ... 'email_address':'u1@ttlsa.com', ... 'phone' : 12345678901, ... 'password': 'www.ttlsa.com' ... }, ... { ... 'user_id':2, ... 'username':'u2', ... 'email_address':'u2@ttlsa.com', ... 'phone' : 12345678901, ... 'password': 'www.ttlsa.com' ... } ... ] >>> result = connection.execute(ins, multi_data) >>> result.rowcount 2 |
10. 查询
1 2 3 4 5 6 7 8 9 |
>>> from sqlalchemy import select >>> s = select([users]) >>> print str(s) SELECT users.user_id, users.username, users.email_address, users.phone, users.password, users.created_on, users.updated_on FROM users >>> rp = connection.execute(s) >>> results = rp.fetchall() >>> print results [(1, 'u1', 'u1@ttlsa.com', '12345678901', 'www.ttlsa.com', datetime.datetime(2016, 6, 22, 11, 26, 5), datetime.datetime(2016, 6, 22, 11, 26, 5)), (2, 'u2', 'u2@ttlsa.com', '12345678901', 'www.ttlsa.com', datetime.datetime(2016, 6, 22, 11, 26, 5), datetime.datetime(2016, 6, 22, 11, 26, 5)), (8888, 'ttlsa.com', 'support@ttlsa.com', '12345678901', 'www.ttlsa.com', datetime.datetime(2016, 6, 22, 11, 13, 32), datetime.datetime(2016, 6, 22, 11, 13, 32))] |
1 2 3 4 5 6 7 8 9 |
>>> first_row = results[0] >>> print first_row (1, 'u1', 'u1@ttlsa.com', '12345678901', 'www.ttlsa.com', datetime.datetime(2016, 6, 22, 11, 26, 5), datetime.datetime(2016, 6, 22, 11, 26, 5)) >>> first_row[1] 'u1' >>> first_row.phone '12345678901' >>> first_row[users.c.user_id] 1 |
1 2 3 4 5 6 7 |
>>> rp = connection.execute(s) >>> for record in rp: ... print record.username ... u1 u2 ttlsa.com |
1 2 3 4 5 6 7 8 9 10 11 12 |
>>> s = select([users.c.user_id, users.c.username]) >>> rp = connection.execute(s) >>> print rp.keys() ['user_id', 'username'] >>> print rp.fetchone() (8888, 'ttlsa.com') >>> print rp.fetchone() (1, 'u1') >>> print rp.fetchone() (2, 'u2') >>> print rp.fetchone() None |
11. 排序
1 2 3 4 5 6 7 8 9 10 11 12 |
>>> s = select([users.c.user_id, users.c.username]) >>> s = s.order_by(users.c.user_id) >>> print str(s) SELECT users.user_id, users.username FROM users ORDER BY users.user_id >>> rp = connection.execute(s) >>> for i in rp: ... print ('{} - {}'.format(i.user_id, i.username)) ... 1 - u1 2 - u2 8888 - ttlsa.com |
1 2 3 4 5 |
>>> s = select([users.c.user_id, users.c.username]) >>> s = s.order_by(desc(users.c.user_id)) >>> rp = connection.execute(s) >>> print (['{} - {}'.format(i.user_id, i.username) for i in rp]) ['8888 - ttlsa.com', '2 - u2', '1 - u1'] |
1 2 3 4 |
>>> s = select([users.c.user_id, users.c.username]).where(users.c.user_id.in_([1,2])) >>> rp = connection.execute(s) >>> print (['{} - {}'.format(i.user_id, i.username) for i in rp]) ['1 - u1', '2 - u2'] |
12. 更新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
>>> from sqlalchemy import update >>> u = update(users).where(users.c.username == 'u1') >>> u = u.values(phone=00000000000) >>> print str(u) UPDATE users SET phone=:phone, updated_on=:updated_on WHERE users.username = :username_1 >>> result = connection.execute(u) >>> print result <sqlalchemy.engine.result.ResultProxy object at 0x7f6839f1a290> >>> print result.rowcount 1 >>> s = select([users]).where(users.c.username == 'u1') >>> result = connection.execute(s).first() >>> print result (1, 'u1', 'u1@ttlsa.com', '0', 'www.ttlsa.com', datetime.datetime(2016, 6, 22, 11, 26, 5), datetime.datetime(2016, 6, 22, 13, 51, 16)) >>> print result.keys() ['user_id', 'username', 'email_address', 'phone', 'password', 'created_on', 'updated_on'] >>> for key in result.keys(): ... print('{:>20}: {}'.format(key, result[key])) ... user_id: 1 username: u1 email_address: u1@ttlsa.com phone: 0 password: www.ttlsa.com created_on: 2016-06-22 11:26:05 updated_on: 2016-06-22 13:51:16 |
13. 删除
1 2 3 4 5 6 7 8 9 10 11 |
>>> from sqlalchemy import delete >>> u = delete(users).where(users.c.username == 'u1') >>> result = connection.execute(u) >>> print result.rowcount 1 >>> print str(u) DELETE FROM users WHERE users.username = :username_1 >>> s = select([users]).where(users.c.username == 'u1') >>> result = connection.execute(s).fetchall() >>> print result [] |

微信公众号
扫一扫关注运维生存时间公众号,获取最新技术文章~
20/10/2016 下午 2:59 沙发
这样的话是没有用session单元啊?