一、数据库学前须知
1、学前知识点
在进行数据存储和管理时,我们常常需要⼀个轻量级而又功能强大的数据库系统。sqlite 是⼀个极受欢迎的轻量级数据库,它被广泛应用于各种大小的项目中。python 自带了对 sqlite 的⽀持,使得在 python 中使用sqlite 变得非常简单和直观。本文将详细介绍 python 中如何使用sqlite3 库进行数据库操作。
sqlite特点介绍:
1.轻量级:sqlite⾮常小巧,无需安装即可使用,适合嵌入到各种应⽤中。
2.易于使⽤:sqlite的语法与标准sql⾮常接近,对于有sql基础的开发者来说,入门非常简单。
3.⾼性能:虽然是轻量级数据库,但sqlite在处理小型和中型数据集时性能非常优越。
4.跨平台:sqlite可以运⾏在各种操作系统上,包括windows、linux和macos。
小提示:sqlite一般只适用于初学者的入门,与一些小型项目的攥写,这里学习sqlite也是为了后面学习打基础
二、数据库基本知识点
1、模块导入与创建表
模块导入:
import sqlite3 # 导⼊sqlite模块
对于python来说,sqlite3属于python标准库的一部分,所以无需下载,直接引用即可
创建表:
1.建立数据库连接(connect):
# 连接到sqlite数据库
conn = sqlite3.connect('students.db')
如果要建立连接的数据库不存在会自动创建一个相同名称的数据库
2.创建游标(cursor)
# 创建⼀个游标对象 cur = conn.cursor()
!!!注意:在sqlite中所有对于数据库和表的操作都需要游标对象来调用函数进行
3.创建表
一些基础知识补充:
id integer primary key autoincrement,# 整形,主键,⾃动增⻓
name text not null, # ⽂本型,不允许为空
age integer, # 整形
grade text # ⽂本型
创建表的函数:
注意:对于数据库建表操作而言,大写与小写都可,例如:id integer primary key autoincrement就和id integer primary key autoincrement是相同效果的
创建表代码示例:
cur.execute(
'''
create table if not exists students(
id integer primary key autoincrement,
title text not null,
author text not null,
year integer
)
'''
)
代码中的if not exists可以在所创建数据表不存在的情况下进行创建,如果存在则跳过不执行
2、基础增删改查操作
数据库插入操作(insert)
数据库插入格式:
insert into 数据表名(元素1,元素2…) values(‘插入数据1’,‘插入数据2’…)
代码示例:
cur.execute("insert into book(title,author,year) values('python编程','张三',2022)")
数据库删除操作(delete)
数据库删除格式:
delete from 数据表名 where 条件语句
如果不加where的条件语句会默认删除该数据表的全部数据
cur.execute("delete from book where title = '算法导论'")
数据库修改操作(update)
数据库修改格式:
update 数据表名 set 元素名 = 修改数据 where 条件语句
与delete相似,如果不加where的条件语句会默认修改该数据表与元素名相关的全部数据
cur.execute("update book set year = 2023 where title = 'python编程'")
数据库查找操作(select)
数据库查找格式:
select 元素名 from 数据表名
元素名可替换为*,这代表查找该数据表的全部数据
cur.execute("select title,author from book")
cur.execute("select * from book")
接收数据函数cur.fetchall()
正确输出查找信息的代码示例:
cur.execute("select * from book")
data = cur.fetchall()
print(data)
注意:必须要通过fetchall函数接收后才能对查找到的信息进行输出
补充知识点总结
1、数据库查找select返回的是列表与元组的嵌套数据,在使用函数传参等相关操作时要特别注意
2、对于sqlite数据库而言,无论是创建表还是增删改查所采用的都是execute函数进行操作,通过游标来调用
3、使用select查找数据后,一定要通过fetchall函数来接收,否则无法返回正确数据
4、删除数据库代码:cur.execute(“drop table book”)
5、数据库操作也可以使用传参不一定非要写死,这就需要使用占位符:?
代码示例:
cur.execute(f"select * from {self.excel} where name = ?",(name,))
!!!注意:后面参数需要是元组格式(name,)
3、函数版增删改查
就是对各个功能进行函数的封装,再在一个主函数中进行调用
代码示例:
import sqlite3 # 导⼊sqlite模块
# 创建数据库连接函数
def create_connection(db_file):
"""
创建与sqlite数据库的连接
:param db_file: 数据库⽂件路径
:return: 数据库连接对象
"""
conn = none # 定义⼀个连接对象
try:
conn = sqlite3.connect(db_file) # 尝试连接到sqlite数据库
print("已成功创建连接")
return conn # 如果成功,返回连接对象
except sqlite3.error as e: # 如果连接失败,打印错误信息
print(f"错误 '{e}' ")
# 创建表函数
def create_table(conn):
"""
创建⽤户表
:param conn: 数据库连接对象
"""
try:
cursor = conn.cursor() # 获取游标对象
cursor.execute('''
create table if not exists users(
id integer primary key,
name text unique,
age integer)
''')
except sqlite3.error as e:
print(e) # 如果创建表失败,打印错误信息
# 插⼊数据函数
def insert_user(conn, data):
"""
插⼊⽤户数据
:param conn: 数据库连接对象
:param data: ⽤户数据元组,包含name和age
"""
try:
python10
cursor = conn.cursor() # 获取游标对象
# 检查⽤户是否已存在
cursor.execute("select id from users where name = ?", (data[0],))
user_id = cursor.fetchone() # 获取查询结果
if user_id is none: # 如果⽤户不存在,插⼊新⽤户
cursor.execute("insert into users (name, age) values (?, ?)",
data)
conn.commit() # 提交事务
else:
print(f"⽤户: {data[0]} 的id: {user_id[0]}已存在")
except sqlite3.error as e:
print(e) # 如果插⼊失败,打印错误信息
# 更新数据函数
def update_user(conn, data):
"""
更新⽤户数据
:param conn: 数据库连接对象
:param data: ⽤户数据元组,包含age和name
"""
try:
cursor = conn.cursor() # 获取游标对象
cursor.execute("update users set age = ? where name = ?", data)
conn.commit() # 提交事务
except sqlite3.error as e:
print(e) # 如果更新失败,打印错误信息
# 删除数据函数
def delete_user(conn, name):
"""
删除⽤户数据
:param conn: 数据库连接对象
:param name: ⽤户名称
"""
try:
cursor = conn.cursor() # 获取游标对象
cursor.execute("delete from users where name = ?", (name,))
conn.commit() # 提交事务
except sqlite3.error as e:
print(e) # 如果删除失败,打印错误信息
# 查询所有数据函数
def select_all_users(conn):
"""
查询所有⽤户数据
:param conn: 数据库连接对象
"""
try:
cursor = conn.cursor() # 获取游标对象
cursor.execute("select * from users") # 执⾏查询所有⽤户的sql语句
rows = cursor.fetchall() # 获取所有查询结果
for row in rows: # 遍历所有结果并打印
print(row)
except sqlite3.error as e:
print(e) # 如果查询失败,打印错误信息
# 主函数
def main():
database = "students2.db" # 定义数据库⽂件名称
conn = create_connection(database) # 创建数据库连接
if conn is not none: # 如果连接成功
create_table(conn) # 创建表
try:
insert_user(conn, ('alice', 30))
insert_user(conn, ('bob', 25))
# 更新数据
update_user(conn, (35, 'alice'))
# 查询并打印所有⽤户
print("查询到的所有⽤户:")
select_all_users(conn)
# 删除⽤户
delete_user(conn, 'bob')
# 再次查询并打印所有⽤户
print("\n删除后的⽤户数据:")
select_all_users(conn)
except sqlite3.error as e:
print(f"出现错误: {e}")
finally:
conn.close() # 确保关闭数据库连接
else:
print("error! ⽆法创建数据库连接。")
# 程序⼊⼝
if __name__ == '__main__':
main()
4、面向对象版增删改查
只是在函数版的基础上增加了类的设置,思路大体一致,传参和初始化有所不同
代码示例:
import sqlite3
class databasemanager:
"""
数据库管理器类,⽤于处理与sqlite数据库的交互
"""
def __init__(self, db_file):
"""
初始化数据库管理器
:param db_file: 数据库⽂件路径
"""
self.db_file = db_file
self.conn = none
def create_connection(self):
"""
创建与sqlite数据库的连接
"""
try:
self.conn = sqlite3.connect(self.db_file)
print("已成功创建连接")
except sqlite3.error as e:
print(f"错误 '{e}'")
def create_table(self):
"""
创建⽤户信息表,包括id、name和age字段
"""
if self.conn:
try:
cursor = self.conn.cursor()
cursor.execute('''
create table if not exists users(
id integer primary key autoincrement,
name text unique,
age integer)
''')
except sqlite3.error as e:
print(e)
def insert_user(self, name, age):
"""
向⽤户表中插⼊新的⽤户记录
:param name: ⽤户名
:param age: ⽤户年龄
"""
if self.conn:
try:
cursor = self.conn.cursor()
cursor.execute("select id from users where name = ?", (na
me,))
if cursor.fetchone() is none:
cursor.execute("insert into users (name, age) values
(?, ?)", (name, age))
self.conn.commit()
else:
print(f"⽤户:{name} 已存在")
except sqlite3.error as e:
print(e)
def update_user(self, name, age):
"""
更新⽤户表中⽤户的年龄信息
:param name: ⽤户名
:param age: 新的⽤户年龄
"""
if self.conn:
try:
cursor = self.conn.cursor()
cursor.execute("update users set age = ? where name = ?",
(age, name))
self.conn.commit()
except sqlite3.error as e:
print(e)
def delete_user(self, name):
"""
从⽤户表中删除指定的⽤户
:param name: 要删除的⽤户名
"""
if self.conn:
try:
cursor = self.conn.cursor()
cursor.execute("delete from users where name = ?", (name,
))
self.conn.commit()
except sqlite3.error as e:
print(e)
def select_all_users(self):
"""
查询并打印⽤户表中所有⽤户的信息
"""
if self.conn:
try:
cursor = self.conn.cursor()
cursor.execute("select * from users")
rows = cursor.fetchall()
for row in rows:
print(row)
except sqlite3.error as e:
print(e)
def close_connection(self):
"""
关闭与数据库的连接
"""
if self.conn:
self.conn.close()
print("数据库连接已关闭")
def main():
"""
主函数,⽤于演示数据库操作
"""
database = "students3.db" # 定义数据库⽂件名称
db_manager = databasemanager(database) # 创建数据库管理器实例
db_manager.create_connection() # 创建数据库连接
db_manager.create_table() # 创建⽤户表
try:
db_manager.insert_user('alice', 30) # 插⼊⽤户alice
db_manager.insert_user('bob', 25) # 插⼊⽤户bob
db_manager.update_user('alice', 35) # 更新⽤户alice的年龄
print("查询到的所有⽤户:")
db_manager.select_all_users() # 查询并打印所有⽤户
db_manager.delete_user('bob') # 删除⽤户bob
print("\n删除后的⽤户数据:")
db_manager.select_all_users() # 再次查询并打印所有⽤户
except sqlite3.error as e:
print(f"出现错误: {e}")
finally:
db_manager.close_connection() # 关闭数据库连接
if __name__ == '__main__':
main()
三、综合案例:基于 fastapi 和 sqlalchemy 实现crud接口
题目要求:
登录表(三条数据)
登录编号 用户名 密码
1 admin admin123
用户信息表(三条数据)
用户编号 姓名 年龄 地址 电话号码
1 张三 22 怀化 13878789898
主程序:
1、登录 2、注册 3、退出
(当登录成功后, 打印: xxx登录成功)
(注册: 如果有相同用户名, 给出提示)
(对用户表进行操作)
1、查询所有用户信息
(查询编号、姓名、年龄、地址、电话号码)
2、修改用户信息(根据用户名称, 修改用户所有信息)
如果当前用户不存在, 给出提示
3、删除用户信息(根据用户名称删除用户信息)
如果当前用户不存在, 给出提示
代码示例:
main页面代码:
# 1.导入模块
from fastapi import fastapi, httpexception
from pydantic import basemodel
from sqlalchemy import create_engine, column,integer,string
from sqlalchemy.orm import sessionmaker, declarative_base
# 2.初始化fastapi对象
app = fastapi()
# 3.数据库配置
# 定义了 sqlite 数据库的连接路径
database_url = "sqlite:///./text2.db"
# 创建数据库连接
engine = create_engine(database_url, connect_args={"check_same_thread": false})
# 创建会话
# bind=engine :指定要操作的数据库连接
sessionlocal = sessionmaker(autocommit=false, autoflush=false, bind=engine)
# 4.创建数据库模型
# 定义数据库模型
base = declarative_base()
class login(base):
# 表名
__tablename__ = "login"
id = column(integer, primary_key=true, index=true)
username = column(string, unique=true, index=true)
password = column(string)
class user(base):
# 表名
__tablename__ = "user"
id = column(integer, primary_key=true, index=true)
name = column(string)
age = column(integer)
address = column(string)
numberphone = column(string)
# 5.创建数据库表
# 如果bind=engin的数据库连接对应的数据库表不存在,则创建
base.metadata.create_all(bind=engine)
# 6.创建用户模型,用于接收请求的用户数据
# basemodel:基础类
class logincreate(basemodel):
username: str
password: str
class usercreate(basemodel):
name: str
age: int | none = none
address: str | none = none
numberphone: str | none = none
class userselect(basemodel):
flag: int
name: str | none = none
# 使用下面代码后,鼠标右键直接运行启动,不需要使用终端
if __name__ == "__main__":
# 导⼊uvicorn模块,⽤于启动fastapi服务
import uvicorn
# 启动fastapi服务,监听所有ip地址,端⼝号为8000
# host="0.0.0.0" 表示监听所有ip地址
# host="127.0.0.1" 表示监听本地ip地址
uvicorn.run("main:app", host="127.0.0.1", port=9991)
@app.get("/")
async def root():
# json类型
return {"message": "hello world"}
# ② 提供用户登录接口,当用户名和密码正确时,提示:欢迎xxx,否则提示:用户名或密码不正确!
@app.get("/login")
def login(login: logincreate):
try:
session = sessionlocal()
user_db = session.query(login).filter(login.username == login.username,
login.password == login.password).first()
if user_db:
return f"{login.username}登录成功!"
else:
return "用户名或密码错误"
except exception as e:
return f"登录失败: {e}"
# ① 提供用户注册接口,当用户名和密码不为空(注意:不能有相同的用户名)
@app.post("/register")
def register(login: logincreate):
try:
session = sessionlocal()
user_db = session.query(login).filter(login.username == login.username).first()
if user_db:
return "用户已存在"
else:
session.add(login(username=login.username, password=login.password))
session.commit()
return "注册成功"
except exception as e:
return f"注册失败: {e}"
@app.post("/user_select")
def user_select(user: userselect):
session = sessionlocal()
if user.flag == 1:
# 查询所有用户信息
return session.query(user).all()
elif user.flag == 2:
# 查询单个用户信息
return session.query(user).filter(user.name == user.name).first()
@app.post("/user_update")
def user_update(user: usercreate):
session = sessionlocal()
flag = session.query(user).filter(user.name == user.name).first()
if flag:
# 直接修改对象属性
flag.name = user.name
flag.age = user.age
flag.address = user.address
flag.numberphone = user.numberphone
session.commit()
return "修改成功"
else:
return "用户不存在"
@app.post("/user_delete")
def user_delete(user: usercreate):
session = sessionlocal()
flag = session.query(user).filter(user.name == user.name).first()
if flag:
session.delete(flag)
session.commit()
return "删除成功"
else:
return "用户不存在"
@app.post("/user_insert")
def user_insert(user: usercreate):
session = sessionlocal()
flag = session.query(user).filter(user.name == user.name).first()
if flag:
return "用户已存在"
else:
session.add(user(name=user.name, age=user.age, address=user.address, numberphone=user.numberphone))
session.commit()
return "新增成功"
测试页面代码:
import sqlite3
import requests
import json
def user_table(n):
if n == 1:
# 查询
flag = input("输入:1、查询所有用户信息页面\t\t2、查询单个用户信息页面")
if flag == '1':
k = {"flag": 1}
res_get = requests.post("http://127.0.0.1:9991/user_select", json=k)
return res_get.text
elif flag == '2':
name = input("请输入用户名:")
k = {"flag": 2, "name": name}
res_get = requests.post("http://127.0.0.1:9991/user_select", json=k)
return res_get.text
else:
return
elif n == 2:
# 修改
name = input("请输入用户名:")
age = input("请输入年龄:")
address = input("请输入地址:")
numberphone = input("请输入手机号:")
params = {
"name": name,
"age": age,
"address": address,
"numberphone": numberphone
}
res_post = requests.post("http://127.0.0.1:9991/user_update", json=params)
return res_post.text
elif n == 3:
# 删除
name = input("请输入用户名:")
res_post = requests.post("http://127.0.0.1:9991/user_delete", json={"name": name})
return res_post.text
elif n == 4:
# 新增
name = input("请输入用户名:")
age = input("请输入年龄:")
address = input("请输入地址:")
numberphone = input("请输入手机号:")
params = {
"name": name,
"age": age,
"address": address,
"numberphone": numberphone
}
res_post = requests.post("http://127.0.0.1:9991/user_insert", json=params)
return res_post.text
#get
while true:
flag = input("输入:1、登录页面\t\t2、注册页面\t\t3、退出系统")
if flag == '1':
print("--------登录页面----------")
username = input("请输入用户名:")
password = input("请输入密码:")
params = {
"username": username,
"password": password
}
res_get = requests.get(f"http://127.0.0.1:9991/login",json = params)
arr = res_get.text[-6:-2]
# print(arr)
if arr == "登录成功":
print(res_get.text)
print("--------------------恭喜进入用户表操作页面--------------------")
while true:
flag1 = input("输入:1、查询用户信息页面\t\t2、修改用户信息页面\t\t3、删除用户信息页面\t\t4、新增用户信息页面\t\t5、退出用户表操作页面")
print(user_table(int(flag1)))
if flag1 == '5':
break
else:
print(res_get.text)
elif flag == '2':
#post
print("--------注册页面----------")
username = input("请输入用户名:")
password = input("请输入密码:")
params = {
"username": username,
"password": password
}
res_post = requests.post("http://127.0.0.1:9991/register", json=params)
print(res_post.text)
else:
conn = sqlite3.connect("../text2.db", check_same_thread=false)
conn.close()
break
总结
到此这篇关于python版sqlite数据库从入门到综合运用案例的文章就介绍到这了,更多相关python sqlite数据库案例内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论