1. mvcc 简介及其在 mysql 中的重要性
1.1 什么是 mvcc
多版本并发控制(mvcc)是现代数据库系统中广泛使用的一种并发控制机制。与传统的锁机制不同,mvcc 通过维护数据的多个版本来实现非阻塞读操作,从而大大提高数据库的并发性能。
在 mvcc 中,当数据被修改时,数据库不会直接覆盖原有数据,而是创建数据的新版本。这样,不同的事务可以看到数据在不同时间点的快照,从而避免了读写冲突。
1.2 mysql 中的 mvcc 实现
mysql 的 innodb 存储引擎通过以下机制实现 mvcc:
隐藏列:每行数据包含两个隐藏列
db_trx_id:最近修改该行数据的事务 iddb_roll_ptr:指向 undo log 中旧版本数据的指针db_row_id(可选):行 id
read view:事务在执行时创建的一致性读视图,决定了该事务能看到哪些数据版本
undo log:存储数据的历史版本,用于构建数据快照和回滚操作
2. 没有 mvcc 的 mysql 会面临的问题
2.1 并发性能大幅下降
没有 mvcc,mysql 将不得不依赖更严格的锁机制来处理并发访问。
-- 在没有 mvcc 的情况下,简单的查询也可能导致阻塞 -- 事务1 start transaction; update users set balance = balance - 100 where id = 1; -- 此行被锁定... -- 事务2(会被阻塞) start transaction; select * from users where id = 1; -- 这个查询会被阻塞,等待事务1提交 commit;
2.2 常见的并发问题
2.2.1 脏读(dirty read)
-- 事务1 start transaction; update accounts set balance = balance + 100 where user_id = 1; -- 在没有 mvcc 和适当隔离级别的情况下 -- 事务2 start transaction; select balance from accounts where user_id = 1; -- 可能读取到未提交的 100 commit; -- 如果事务1回滚 rollback; -- 事务2读取的数据就是无效的
2.2.2 不可重复读(non-repeatable read)
-- 事务1 start transaction; select * from products where id = 1; -- 返回 price = 100 -- 事务2 start transaction; update products set price = 120 where id = 1; commit; -- 事务1再次查询 select * from products where id = 1; -- 现在返回 price = 120 -- 同一事务中两次查询结果不一致 commit;
2.2.3 幻读(phantom read)
-- 事务1 start transaction; select count(*) from orders where user_id = 1 and status = 'pending'; -- 返回 5 -- 事务2 start transaction; insert into orders (user_id, status, amount) values (1, 'pending', 50); commit; -- 事务1再次查询 select count(*) from orders where user_id = 1 and status = 'pending'; -- 返回 6 -- 出现了幻影行 commit;
3. 没有 mvcc 时的替代解决方案
3.1 基于锁的并发控制
-- 使用表级锁保证一致性 -- 事务1 lock tables accounts write; start transaction; select balance from accounts where user_id = 1; update accounts set balance = balance - 100 where user_id = 1; commit; unlock tables; -- 事务2(必须等待) lock tables accounts write; -- 等待事务1释放锁 start transaction; select balance from accounts where user_id = 1; -- ...
3.2 应用层并发控制
// java 示例:使用应用层乐观锁
public class accountservice {
public boolean transfermoney(int fromuserid, int touserid, bigdecimal amount) {
connection conn = null;
try {
conn = datasource.getconnection();
conn.setautocommit(false);
// 使用版本号实现乐观锁
string selectsql = "select id, balance, version from accounts where user_id = ? for update";
preparedstatement stmt1 = conn.preparestatement(selectsql);
stmt1.setint(1, fromuserid);
resultset rs = stmt1.executequery();
if (rs.next()) {
bigdecimal currentbalance = rs.getbigdecimal("balance");
int currentversion = rs.getint("version");
if (currentbalance.compareto(amount) < 0) {
conn.rollback();
return false; // 余额不足
}
// 更新账户
string updatesql = "update accounts set balance = balance - ?, version = version + 1 " +
"where user_id = ? and version = ?";
preparedstatement stmt2 = conn.preparestatement(updatesql);
stmt2.setbigdecimal(1, amount);
stmt2.setint(2, fromuserid);
stmt2.setint(3, currentversion);
int rowsaffected = stmt2.executeupdate();
if (rowsaffected == 0) {
// 版本号不匹配,说明数据已被其他事务修改
conn.rollback();
return false; // 需要重试
}
conn.commit();
return true;
}
} catch (sqlexception e) {
if (conn != null) {
try { conn.rollback(); } catch (sqlexception ex) {}
}
throw new runtimeexception("transfer failed", e);
} finally {
if (conn != null) {
try { conn.close(); } catch (sqlexception e) {}
}
}
return false;
}
}
3.3 使用 redis 分布式锁
// 使用 redis 实现分布式锁来处理并发
public class distributedaccountservice {
private jedispool jedispool;
private datasource datasource;
public boolean transferwithdistributedlock(int fromuserid, int touserid, bigdecimal amount) {
string lockkey = "account_lock:" + fromuserid;
string lockvalue = uuid.randomuuid().tostring();
jedis jedis = null;
connection conn = null;
try {
jedis = jedispool.getresource();
// 获取分布式锁
boolean locked = false;
long starttime = system.currenttimemillis();
while (system.currenttimemillis() - starttime < 5000) { // 5秒超时
if ("ok".equals(jedis.set(lockkey, lockvalue, "nx", "px", 30000))) {
locked = true;
break;
}
thread.sleep(100); // 短暂等待后重试
}
if (!locked) {
throw new runtimeexception("acquire lock timeout");
}
// 执行转账操作
conn = datasource.getconnection();
conn.setautocommit(false);
// ... 转账逻辑
conn.commit();
return true;
} catch (exception e) {
if (conn != null) {
try { conn.rollback(); } catch (sqlexception ex) {}
}
throw new runtimeexception("transfer failed", e);
} finally {
if (jedis != null) {
// 使用 lua 脚本保证原子性地释放锁
string luascript = "if redis.call('get', keys[1]) == argv[1] then " +
"return redis.call('del', keys[1]) " +
"else return 0 end";
jedis.eval(luascript, 1, lockkey, lockvalue);
jedis.close();
}
if (conn != null) {
try { conn.close(); } catch (sqlexception e) {}
}
}
}
}
4. 性能对比分析
4.1 测试场景设计
-- 创建测试表
create table performance_test (
id int primary key auto_increment,
data varchar(1000),
counter int default 0,
created_at timestamp default current_timestamp
);
-- 插入测试数据
delimiter $$
create procedure inserttestdata(in numrecords int)
begin
declare i int default 0;
while i < numrecords do
insert into performance_test (data) values (repeat('x', 1000));
set i = i + 1;
end while;
end$$
delimiter ;
call inserttestdata(10000);
4.2 并发测试代码
// java 并发测试
public class concurrenttest {
private static final int thread_count = 50;
private static final int operations_per_thread = 100;
private static final cyclicbarrier barrier = new cyclicbarrier(thread_count);
private static final countdownlatch latch = new countdownlatch(thread_count);
private static final atomiclong successcount = new atomiclong(0);
private static final atomiclong failurecount = new atomiclong(0);
public static void main(string[] args) throws interruptedexception {
executorservice executor = executors.newfixedthreadpool(thread_count);
long starttime = system.currenttimemillis();
for (int i = 0; i < thread_count; i++) {
executor.execute(new worker(i));
}
latch.await();
executor.shutdown();
long endtime = system.currenttimemillis();
system.out.println("总执行时间: " + (endtime - starttime) + "ms");
system.out.println("成功操作: " + successcount.get());
system.out.println("失败操作: " + failurecount.get());
system.out.println("吞吐量: " + (successcount.get() * 1000.0 / (endtime - starttime)) + " ops/sec");
}
static class worker implements runnable {
private final int workerid;
worker(int workerid) {
this.workerid = workerid;
}
@override
public void run() {
try {
barrier.await(); // 所有线程同时开始
for (int i = 0; i < operations_per_thread; i++) {
if (performoperation()) {
successcount.incrementandget();
} else {
failurecount.incrementandget();
}
}
} catch (exception e) {
e.printstacktrace();
} finally {
latch.countdown();
}
}
private boolean performoperation() {
// 执行数据库操作
// 1. 有 mvcc 的情况:使用普通事务
// 2. 没有 mvcc 的情况:使用悲观锁或乐观锁
return true;
}
}
}
5. 系统架构调整方案
5.1 读写分离架构
┌─────────────────┐ ┌──────────────────┐
│ 应用服务器层 │ │ 数据库代理层 │
│ │ │ │
│ ┌─────────────┐ │ │ ┌──────────────┐ │
│ │ web应用 │──┼─────▶│ proxy │ │
│ └─────────────┘ │ │ │ (如mycat) │ │
│ ┌─────────────┐ │ │ └──────────────┘ │
│ │ api服务 │──┼─────┘ │ │
│ └─────────────┘ │ │ │
└─────────────────┘ ▼ ▼
┌─────────────────────────────────┐
│ 数据库层 │
│ │
│ ┌─────────────┐ ┌───────────┐ │
│ │ 主数据库 │ │ 从数据库 │ │
│ │ (写操作) │ │ (读操作) │ │
│ └─────────────┘ └───────────┘ │
└─────────────────────────────────┘
5.2 分库分表策略
// 分库分表示例
public class shardingservice {
private static final int db_count = 4;
private static final int table_count_per_db = 8;
public shardingresult calculatesharding(long userid) {
// 分库:userid % db_count
int dbindex = (int) (userid % db_count);
string dbname = "user_db_" + dbindex;
// 分表:userid / db_count % table_count_per_db
int tableindex = (int) (userid / db_count % table_count_per_db);
string tablename = "user_info_" + tableindex;
return new shardingresult(dbname, tablename);
}
public static class shardingresult {
public final string dbname;
public final string tablename;
public shardingresult(string dbname, string tablename) {
this.dbname = dbname;
this.tablename = tablename;
}
}
}
6. 监控和调优策略
6.1 锁监控
-- 监控当前锁情况
select
r.trx_id waiting_trx_id,
r.trx_mysql_thread_id waiting_thread,
r.trx_query waiting_query,
b.trx_id blocking_trx_id,
b.trx_mysql_thread_id blocking_thread,
b.trx_query blocking_query
from information_schema.innodb_lock_waits w
inner join information_schema.innodb_trx b on b.trx_id = w.blocking_trx_id
inner join information_schema.innodb_trx r on r.trx_id = w.requesting_trx_id;
-- 查看当前活动事务
select * from information_schema.innodb_trx
order by trx_started desc;
6.2 性能监控脚本
-- 创建监控表
create table lock_monitor (
id bigint primary key auto_increment,
sample_time timestamp default current_timestamp,
lock_wait_count int,
long_running_trx_count int,
deadlock_count int
);
-- 定期收集监控数据
delimiter $$
create procedure collectlockstats()
begin
insert into lock_monitor (lock_wait_count, long_running_trx_count, deadlock_count)
select
(select count(*) from information_schema.innodb_lock_waits),
(select count(*) from information_schema.innodb_trx
where timestampdiff(second, trx_started, now()) > 60),
(select variable_value from information_schema.global_status
where variable_name = 'innodb_deadlocks');
end$$
delimiter ;
-- 创建事件定期执行
create event monitor_lock_event
on schedule every 1 minute
do call collectlockstats();
7. 总结
如果没有 mvcc 机制,mysql 将面临严重的并发性能问题。为了维持数据一致性,系统将不得不依赖更严格的锁机制,这会导致:
- 吞吐量大幅下降:大量的锁等待会限制系统并发处理能力
- 响应时间增加:读操作可能被写操作阻塞
- 死锁风险增加:复杂的锁依赖关系容易导致死锁
- 系统复杂性提高:需要在应用层实现复杂的并发控制逻辑
虽然可以通过读写分离、分库分表、应用层锁等方案来缓解问题,但这些方案都会增加系统的复杂性和维护成本。mvcc 机制在保证数据一致性的同时提供了优异的并发性能,是现代数据库系统不可或缺的重要特性。
在实际系统设计中,我们应该充分理解 mvcc 的工作原理,合理设置事务隔离级别,并在必要时配合使用适当的锁策略,才能在数据一致性和系统性能之间找到最佳平衡点。
以上就是mysql中没有mvcc机制的影响分析与替代方案的详细内容,更多关于mysql没有mvcc机制的资料请关注代码网其它相关文章!
发表评论