前言
sqlite 和 postgresql 都支持 upsert 操作,即"有则更新,无则新增"。冲突列必须有唯一约束。
语法:
- postgresql:
insert ... on conflict (column) do update/nothing - sqlite:
insert ... on conflict(column) do update/nothing。注意括号位置
| 场景 | postgresql | sqlite | 说明 |
|---|---|---|---|
| 基本 upsert | on conflict (col) do update set ... | on conflict(col) do update set ... | 括号位置略有不同 |
| 冲突忽略 | on conflict (col) do nothing | on conflict(col) do nothing | 相同 |
| 引用新值 | excluded.col | excluded.col | postgresql 大写,sqlite 小写 |
| 返回结果 | returning * | returning * | 相同 |
| 条件更新 | where condition | 不支持 where | sqlite 限制 |
注意事项
- 冲突列必须有唯一约束
- postgresql 和 sqlite 的语法相似,但仍有细微差别。使用原生 sql 时需要注意。
- sqlite 在 upsert 时不支持 where 子句,需要改用 case 表达式或应用层过滤。
- sqlite 3.35+ 版本才支持 returning
excluded 和 returning
excluded
excluded 表示冲突时被拦截的新值。
insert into users (email, name, age)
values ('test@example.com', '新名字', 30)
on conflict (email) do update set
name = excluded.name, -- ← 引用新值 "新名字"
age = excluded.age -- ← 引用新值 30| 场景 | 表达式 | 含义 | 示例值 |
|---|---|---|---|
| 原表字段 | users.name | 冲突行的当前值 | "老名字" |
| 新值字段 | excluded.name | 试图插入的新值 | "新名字" |
| 混合计算 | users.age + excluded.age | 原值 + 新值 | 25 + 30 = 55 |
示例 1:累加库存
-- 商品库存累加:原库存 100 + 新增 50 = 150
insert into products (sku, stock)
values ('iphone15', 50)
on conflict (sku) do update set
stock = products.stock + excluded.stock -- 100 + 50
returning stock;示例 2:仅更新非空字段
-- 如果新值为 null,保留原值
insert into users (email, name, age)
values ('test@example.com', '新名字', null)
on conflict (email) do update set
name = coalesce(excluded.name, users.name), -- 新名字
age = coalesce(excluded.age, users.age) -- 保留原 age
示例 3:时间戳更新
-- 更新时刷新 updated_at
insert into users (email, name)
values ('test@example.com', '新名字')
on conflict (email) do update set
name = excluded.name,
updated_at = now() -- postgresql
-- updated_at = current_timestamp -- sqlite
returning
returning 用于返回操作结果。在 insert/update/delete 后直接返回指定列,避免额外 select 查询:
insert into users (email, name)
values ('test@example.com', '张三')
returning id, email, name, created_at;
示例 1:插入后立即获取 id
# postgresql / sqlite 3.35+
sql = text("""
insert into users (email, name)
values (:email, :name)
returning id, email, created_at
""")
result = await session.execute(sql, {"email": "test@example.com", "name": "张三"})
user = result.mappings().first()
print(user["id"]) # 直接获取 id示例 2:upsert 后统一返回
-- 无论插入还是更新,都返回最终状态
insert into users (email, name, login_count)
values ('test@example.com', '张三', 1)
on conflict (email) do update set
name = excluded.name,
login_count = users.login_count + 1 -- 累加登录次数
returning
id,
email,
name,
login_count,
case
when xmax = 0 then 'inserted' -- postgresql 特有:xmax=0 表示插入
else 'updated'
end as action示例 3:批量操作返回所有结果
-- postgresql 支持批量 returning
insert into users (email, name)
values
('a@example.com', 'a'),
('b@example.com', 'b')
on conflict (email) do update set
name = excluded.name
returning id, email, name;python 处理批量返回:
result = await session.execute(sql)
users = [dict(row) for row in result.mappings().all()]
# [{'id': 1, 'email': 'a@example.com', 'name': 'a'}, ...]
示例:用户登录计数器
async def record_user_login(session: asyncsession, email: str, name: str) -> dict:
"""
用户登录计数器:
- 新用户:插入,login_count = 1
- 老用户:更新,login_count += 1
- 返回最终状态 + 操作类型
"""
sql = text("""
insert into users (
email, name, login_count, last_login, created_at
) values (
:email, :name, 1, :now, :now
)
on conflict (email) do update set
name = excluded.name, -- 更新用户名
login_count = users.login_count + 1, -- 累加登录次数
last_login = excluded.last_login -- 更新最后登录时间
returning
id,
email,
name,
login_count,
last_login,
created_at,
case
when xmax = 0 then 'inserted'
else 'updated'
end as action -- postgresql 特有:区分插入/更新
""")
now = datetime.utcnow()
result = await session.execute(
sql,
{"email": email, "name": name, "now": now}
)
row = result.mappings().first()
return dict(row) if row else none
# 使用示例
user = await record_user_login(session, "test@example.com", "张三")
print(f"{user['action']} user {user['email']} with {user['login_count']} logins")
# 输出: inserted user test@example.com with 1 logins
# 或: updated user test@example.com with 5 logins示例数据模型类
from sqlalchemy import column, integer, string, uniqueconstraint
from sqlalchemy.orm import declarativebase
class base(declarativebase):
pass
class user(base):
__tablename__ = "users"
id = column(integer, primary_key=true, autoincrement=true)
email = column(string(100), unique=true, nullable=false) # 唯一约束
name = column(string(50))
age = column(integer)
balance = column(integer, default=0)
__table_args__ = (
uniqueconstraint("email", name="uq_users_email"),
)
class product(base):
__tablename__ = "products"
id = column(integer, primary_key=true)
sku = column(string(50), unique=true, nullable=false) # 唯一 sku
name = column(string(100))
stock = column(integer, default=0)
price = column(integer)orm 方式
注意 insert 的导入路径。
基本示例
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy import insert
async def upsert_user_orm(session: asyncsession, user_data: dict) -> dict:
"""
upsert 用户(orm 风格)
如果 email 冲突则更新,否则插入
"""
# 方式 1:使用通用 insert(推荐⭐)
# sqlalchemy 会根据方言自动选择正确的语法
stmt = (
insert(user)
.values(**user_data)
.on_conflict_do_update(
index_elements=["email"], # 冲突检测列(唯一约束)
set_={
"name": user_data["name"],
"age": user_data.get("age"),
"updated_at": func.now() # 假设有 updated_at 列
}
)
.returning(user) # 返回插入/更新后的行
)
result = await session.execute(stmt)
user = result.scalar_one()
return {
"id": user.id,
"email": user.email,
"name": user.name,
"age": user.age
}
async def upsert_user_ignore(session: asyncsession, user_data: dict) -> bool:
"""
upsert 但冲突时忽略(do nothing)
"""
stmt = (
insert(user)
.values(**user_data)
.on_conflict_do_nothing(
index_elements=["email"]
)
)
result = await session.execute(stmt)
return result.rowcount > 0 # 返回是否插入成功条件更新:仅更新特定字段
async def upsert_user_conditional(session: asyncsession, user_data: dict) -> dict:
"""
upsert:冲突时只更新非空字段
"""
stmt = (
insert(user)
.values(**user_data)
.on_conflict_do_update(
index_elements=["email"],
set_={
"name": user_data["name"],
# 条件:只有提供了 age 才更新
"age": user_data.get("age", user.age), # 保持原值
},
# 可选:添加 where 条件
where=user.email == user_data["email"]
)
.returning(user)
)
result = await session.execute(stmt)
return result.mappings().first()批量 upsert
async def bulk_upsert_users(session: asyncsession, users: list[dict]) -> int:
"""
批量 upsert 用户
"""
stmt = (
insert(user)
.values(users)
.on_conflict_do_update(
index_elements=["email"],
set_={
"name": insert(user).excluded.name, # 使用 excluded 表示新值
"age": insert(user).excluded.age,
}
)
)
result = await session.execute(stmt)
return result.rowcount使用 excluded 引用新值
async def upsert_product_with_stock(session: asyncsession, product_data: dict) -> dict:
"""
upsert 产品:冲突时累加库存
"""
stmt = (
insert(product)
.values(**product_data)
.on_conflict_do_update(
index_elements=["sku"],
set_={
# 累加库存:原库存 + 新库存
"stock": product.stock + insert(product).excluded.stock,
# 更新其他字段
"name": insert(product).excluded.name,
"price": insert(product).excluded.price,
}
)
.returning(product)
)
result = await session.execute(stmt)
return result.mappings().first()用户服务
class userservice:
"""用户服务(支持 upsert)"""
def __init__(self, session: asyncsession):
self.session = session
async def create_or_update(self, email: str, name: str, age: int | none = none) -> dict:
"""创建或更新用户"""
stmt = (
insert(user)
.values(
email=email,
name=name,
age=age,
created_at=datetime.utcnow()
)
.on_conflict_do_update(
index_elements=["email"],
set_={
"name": name,
"age": age,
"updated_at": datetime.utcnow()
}
)
.returning(user)
)
result = await self.session.execute(stmt)
user = result.scalar_one()
return {
"id": user.id,
"email": user.email,
"name": user.name,
"age": user.age
}
async def bulk_create_or_update(self, users: list[dict]) -> int:
"""批量创建或更新"""
stmt = (
insert(user)
.values(users)
.on_conflict_do_update(
index_elements=["email"],
set_={
"name": insert(user).excluded.name,
"age": insert(user).excluded.age,
"updated_at": datetime.utcnow()
}
)
)
result = await self.session.execute(stmt)
return result.rowcount
async def create_if_not_exists(self, email: str, name: str) -> bool:
"""仅当不存在时创建"""
stmt = (
insert(user)
.values(
email=email,
name=name,
created_at=datetime.utcnow()
)
.on_conflict_do_nothing(
index_elements=["email"]
)
)
result = await self.session.execute(stmt)
return result.rowcount > 0 # true = 插入成功,false = 已存在原生 sql
基本示例
postgresql
async def upsert_user_pg(session: asyncsession, user_data: dict) -> dict | none:
"""
postgresql 原生 upsert
"""
sql = text("""
insert into users (email, name, age, created_at)
values (:email, :name, :age, :created_at)
on conflict (email) do update -- 冲突列
set
name = excluded.name, -- excluded 表示新插入的值
age = excluded.age,
updated_at = now()
returning id, email, name, age
""")
result = await session.execute(
sql,
{
"email": user_data["email"],
"name": user_data["name"],
"age": user_data.get("age"),
"created_at": datetime.utcnow()
}
)
row = result.mappings().first()
return dict(row) if row else nonesqlite
async def upsert_user_sqlite(session: asyncsession, user_data: dict) -> dict | none:
"""
sqlite 原生 upsert(语法与 postgresql 几乎相同)
"""
sql = text("""
insert into users (email, name, age, created_at)
values (:email, :name, :age, :created_at)
on conflict(email) do update set -- sqlite 语法稍有不同
name = excluded.name,
age = excluded.age,
updated_at = current_timestamp
returning id, email, name, age
""")
result = await session.execute(
sql,
{
"email": user_data["email"],
"name": user_data["name"],
"age": user_data.get("age"),
"created_at": datetime.utcnow()
}
)
row = result.mappings().first()
return dict(row) if row else none冲突时忽略
async def insert_or_ignore_user(session: asyncsession, user_data: dict) -> bool:
"""
插入用户,如果冲突则忽略
"""
# postgresql
sql = text("""
insert into users (email, name, age, created_at)
values (:email, :name, :age, :created_at)
on conflict (email) do nothing
""")
# sqlite(语法相同)
# sql = text("""
# insert into users (email, name, age, created_at)
# values (:email, :name, :age, :created_at)
# on conflict(email) do nothing
# """)
result = await session.execute(
sql,
{
"email": user_data["email"],
"name": user_data["name"],
"age": user_data.get("age"),
"created_at": datetime.utcnow()
}
)
return result.rowcount > 0 # 返回是否插入成功批量 upsert
async def bulk_upsert_products(session: asyncsession, products: list[dict]) -> int:
"""
批量 upsert 产品(原生 sql)
"""
# postgresql
sql = text("""
insert into products (sku, name, stock, price, created_at)
values (
:sku, :name, :stock, :price, :created_at
)
on conflict (sku) do update set
name = excluded.name,
stock = products.stock + excluded.stock, -- 累加库存
price = excluded.price,
updated_at = now()
""")
# 批量执行
for product in products:
await session.execute(
sql,
{
"sku": product["sku"],
"name": product["name"],
"stock": product.get("stock", 0),
"price": product.get("price", 0),
"created_at": datetime.utcnow()
}
)
return len(products)部分更新 + 条件判断
async def upsert_user_smart(session: asyncsession, user_data: dict) -> dict | none:
"""
智能 upsert:
- 如果提供了 age,才更新 age
- 如果提供了 name,才更新 name
- 更新 updated_at
"""
sql = text("""
insert into users (email, name, age, created_at)
values (:email, :name, :age, :created_at)
on conflict (email) do update set
name = coalesce(:name, users.name), -- 如果新值为 null,保持原值
age = coalesce(:age, users.age),
updated_at = now()
returning id, email, name, age, updated_at
""")
result = await session.execute(
sql,
{
"email": user_data["email"],
"name": user_data.get("name"), # 可能为 none
"age": user_data.get("age"), # 可能为 none
"created_at": datetime.utcnow()
}
)
row = result.mappings().first()
return dict(row) if row else none用户注册/登录:存在则更新最后登录时间
async def register_or_login(session: asyncsession, email: str, name: str) -> dict:
"""
用户注册或登录:
- 新用户:插入
- 老用户:更新最后登录时间
"""
sql = text("""
insert into users (email, name, last_login, created_at)
values (:email, :name, :now, :now)
on conflict (email) do update set
last_login = excluded.last_login,
name = excluded.name -- 可选:更新用户名
returning id, email, name, last_login, created_at
""")
now = datetime.utcnow()
result = await session.execute(
sql,
{"email": email, "name": name, "now": now}
)
return dict(result.mappings().first())库存累加
async def add_product_stock(session: asyncsession, sku: str, quantity: int) -> bool:
"""
增加商品库存:
- 商品不存在:插入
- 商品存在:累加库存
"""
sql = text("""
insert into products (sku, stock, created_at)
values (:sku, :quantity, :now)
on conflict (sku) do update set
stock = products.stock + excluded.stock,
updated_at = now()
""")
result = await session.execute(
sql,
{
"sku": sku,
"quantity": quantity,
"now": datetime.utcnow()
}
)
return result.rowcount > 0用户积分累加
async def add_user_points(session: asyncsession, user_id: int, points: int) -> dict | none:
"""
增加用户积分(累加)
"""
sql = text("""
insert into user_points (user_id, points, created_at)
values (:user_id, :points, :now)
on conflict (user_id) do update set
points = user_points.points + excluded.points,
updated_at = now()
returning user_id, points
""")
result = await session.execute(
sql,
{
"user_id": user_id,
"points": points,
"now": datetime.utcnow()
}
)
row = result.mappings().first()
return dict(row) if row else none标签计数
存在则 +1,不存在则创建:
async def increment_tag_count(session: asyncsession, tag_name: str) -> int:
"""
标签计数:
- 标签不存在:插入 count=1
- 标签存在:count += 1
"""
sql = text("""
insert into tags (name, count, created_at)
values (:name, 1, :now)
on conflict (name) do update set
count = tags.count + 1,
updated_at = now()
returning count
""")
result = await session.execute(
sql,
{"name": tag_name, "now": datetime.utcnow()}
)
return result.scalar() or 0到此这篇关于sqlalchemy中使用upsert的操作方法的文章就介绍到这了,更多相关sqlalchemy使用upsert内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论