利用线程池和redis实现高效数据入库
在高并发环境中,进行数据入库是一项具有挑战性的任务。
本文将介绍如何利用线程池和redis实现数据的实时缓存和批量入库处理,确保系统的性能和稳定性。
主要思路和组件介绍
思路概述
在高并发情况下,数据入库需要解决两个主要问题:实时性和稳定性。
通过将数据首先存储在redis缓存中,可以快速响应和处理大量的数据请求,然后利用线程池定期批量将数据从redis取出并入库,以减少数据库压力和提高整体性能。
主要组件
- batchdatastorageservice:核心服务类,负责数据的实时缓存和定期批量入库处理。
- cacheservice:简易缓存服务类,使用concurrenthashmap实现内存缓存,用于快速存取和处理数据。
- redisutils:封装了对redis的操作,用于数据的持久化存储和高速读取。
- batchworker:实现了runnable接口,处理从redis中读取数据并执行批量入库的任务。
- batchtimeoutcommitthread:定时监控数据是否达到设定的批次或超时时间,并触发数据入库操作。
详细代码解析
- batchdatastorageservice
package io.jack.service.impl; import com.alibaba.fastjson.json; import com.alibaba.fastjson.jsonarray; import lombok.extern.slf4j.slf4j; import org.springframework.beans.factory.initializingbean; import org.springframework.beans.factory.annotation.value; import org.springframework.stereotype.component; import javax.annotation.resource; import java.util.arraylist; import java.util.list; import java.util.concurrent.executorservice; import java.util.concurrent.executors; /** * 数据批量入库服务类 */ @component @slf4j public class batchdatastorageservice implements initializingbean { /** * 最大批次数量 */ @value("${app.db.maxbatchcount:800}") private int maxbatchcount; /** * 最大线程数 */ @value("${app.db.maxbatchthreads:100}") private int maxbatchthreads; /** * 超时时间,单位毫秒 */ @value("${app.db.batchtimeout:3000}") private int batchtimeout; /** * 当前批次数量 */ private int batchcount = 0; /** * 当前批次号 */ private static long batchno = 0; /** * 线程池执行器 */ private executorservice executorservice = null; /** * 缓存服务 */ @resource private cacheservice cacheservice; /** * 设备实时服务 */ @resource private devicerealtimeservice devicerealtimeservice; /** * redis工具类 */ @resource private redisutils redisutils; /** * 初始化线程池 */ @override public void afterpropertiesset() { executorservice = executors.newfixedthreadpool(maxbatchthreads); } /** * 保存设备实时数据 * * @param devicerealtimedto 设备实时数据传输对象 */ public void saverealtimedata(devicerealtimedto devicerealtimedto) { final string failedcachekey = "device:real_time:failed_records"; try { // 生成批次和持续时间的缓存键 string durationkey = "device:real_time:batchduration" + batchno; string batchkey = "device:real_time:batch" + batchno; // 如果当前批次持续时间不存在,则创建并启动超时处理线程 if (!cacheservice.exists(durationkey)) { cacheservice.put(durationkey, system.currenttimemillis()); new batchtimeoutcommitthread(batchkey, durationkey, failedcachekey).start(); } // 将设备实时数据加入当前批次 cacheservice.lpush(batchkey, devicerealtimedto); if (++batchcount >= maxbatchcount) { // 达到最大批次,执行入库逻辑 datastorage(durationkey, batchkey, failedcachekey); } } catch (exception ex) { log.warn("[db:failed] 设备上报记录入批处理集合异常: " + ex.getmessage() + ", devicerealtimedto: " + json.tojsonstring(devicerealtimedto), ex); cacheservice.lpush(failedcachekey, devicerealtimedto); } finally { updaterealtimedata(devicerealtimedto); } } /** * 更新实时数据到redis * * @param devicerealtimedto 设备实时数据传输对象 */ private void updaterealtimedata(devicerealtimedto devicerealtimedto) { redisutils.set("real_time:" + devicerealtimedto.getdeviceid(), jsonarray.tojsonstring(devicerealtimedto)); } /** * 批量入库处理 * * @param durationkey 持续时间标识 * @param batchkey 批次标识 * @param failedcachekey 错误记录标识 */ private void datastorage(string durationkey, string batchkey, string failedcachekey) { batchno++; batchcount = 0; cacheservice.del(durationkey); if (batchno >= long.max_value) { batchno = 0; } executorservice.execute(new batchworker(batchkey, failedcachekey)); } /** * 批量工作线程 */ private class batchworker implements runnable { private final string failedcachekey; private final string batchkey; public batchworker(string batchkey, string failedcachekey) { this.batchkey = batchkey; this.failedcachekey = failedcachekey; } @override public void run() { final list<devicerealtimedto> devicerealtimedtolist = new arraylist<>(); try { // 从缓存中获取批量数据 devicerealtimedto devicerealtimedto = cacheservice.lpop(batchkey); while (devicerealtimedto != null) { devicerealtimedtolist.add(devicerealtimedto); devicerealtimedto = cacheservice.lpop(batchkey); } long timemillis = system.currenttimemillis(); try { // 将dto转换为实体对象并批量入库 list<devicerealtimeentity> devicerealtimeentitylist = convertutils.sourcetotarget(devicerealtimedtolist, devicerealtimeentity.class); devicerealtimeservice.insertbatch(devicerealtimeentitylist); } finally { cacheservice.del(batchkey); log.info("[db:batch_worker] 批次:" + batchkey + ",保存设备上报记录数:" + devicerealtimedtolist.size() + ", 耗时:" + (system.currenttimemillis() - timemillis) + "ms"); } } catch (exception e) { log.warn("[db:failed] 设备上报记录批量入库失败:" + e.getmessage() + ", devicerealtimedto: " + devicerealtimedtolist.size(), e); for (devicerealtimedto devicerealtimedto : devicerealtimedtolist) { cacheservice.lpush(failedcachekey, devicerealtimedto); } } } } /** * 批次超时提交线程 */ class batchtimeoutcommitthread extends thread { private final string batchkey; private final string durationkey; private final string failedcachekey; public batchtimeoutcommitthread(string batchkey, string durationkey, string failedcachekey) { this.batchkey = batchkey; this.durationkey = durationkey; this.failedcachekey = failedcachekey; this.setname("batch-thread-" + batchkey); } @override public void run() { try { thread.sleep(batchtimeout); } catch (interruptedexception e) { log.error("[db] 内部错误,直接提交:" + e.getmessage()); } if (cacheservice.exists(durationkey)) { // 达到最大批次的超时间,执行入库逻辑 datastorage(durationkey, batchkey, failedcachekey); } } } }
- cacheservice
package io.jack.service; import org.springframework.beans.factory.initializingbean; import org.springframework.stereotype.component; import java.util.linkedlist; import java.util.map; import java.util.concurrent.concurrenthashmap; import java.util.concurrent.atomic.atomiclong; /** * 缓存服务类,提供简易的缓存机制 */ @component public class cacheservice implements initializingbean { /** * 内存缓存,用于存储数据 */ private map<string, object> objectcache = new concurrenthashmap<>(); /** * 统计缓存,用于统计计数 */ private map<string, atomiclong> statcache = new concurrenthashmap<>(); /** * 初始化统计缓存 */ @override public void afterpropertiesset() { statcache.put("terminals", new atomiclong(0)); statcache.put("connections", new atomiclong(0)); } /** * 增加指定统计项的计数 * * @param statname 统计项名称 * @return 增加后的计数值 */ public long incr (string statname) { statcache.putifabsent(statname, new atomiclong(0)); return statcache.get(statname).incrementandget(); } /** * 减少指定统计项的计数 * * @param statname 统计项名称 * @return 减少后的计数值 */ public long decr(string statname) { statcache.putifabsent(statname, new atomiclong(0)); return statcache.get(statname).decrementandget(); } /** * 获取指定统计项的当前计数值 * * @param statname 统计项名称 * @return 当前计数值 */ public long stat(string statname) { statcache.putifabsent(statname, new atomiclong(0)); return statcache.get(statname).get(); } /** * 存储数据 * * @param key 缓存键 * @param object 缓存数据 */ public <t> void put(string key, t object) { objectcache.put(key, object); } /** * 获取数据 * * @param key 缓存键 * @return 缓存数据 */ public <t> t get(string key) { return (t) objectcache.get(key); } /** * 删除数据 * * @param key 缓存键 */ public void remove(string key) { objectcache.remove(key); } /** * 存储哈希表数据 * * @param key 哈希表键 * @param subkey 哈希表子键 * @param value 哈希表值 */ public void hset(string key, string subkey, object value) { synchronized (objectcache) { map<string, object> submap = (map<string, object>) objectcache.computeifabsent(key, k -> new concurrenthashmap<>()); submap.put(subkey, value); } } /** * 获取哈希表数据 * * @param key 哈希表键 * @param subkey 哈希表子键 * @return 哈希表值 */ public <t> t hget(string key, string subkey) { synchronized (objectcache) { map<string, object> submap = (map<string, object>) objectcache.get(key); return submap != null ? (t) submap.get(subkey) : null; } } /** * 判断哈希表子键是否存在 * * @param key 哈希表键 * @param subkey 哈希表子键 * @return 是否存在 */ public boolean hexists(string key, string subkey) { synchronized (objectcache) { map<string, object> submap = (map<string, object>) objectcache.get(key); return submap != null && submap.containskey(subkey); } } /** * 将数据推入列表 * * @param key 列表键 * @param value 列表值 */ public void lpush(string key, object value) { synchronized (objectcache) { linkedlist<object> queue = (linkedlist<object>) objectcache.computeifabsent(key, k -> new linkedlist<>()); queue.addlast(value); } } /** * 从列表中弹出数据 * * @param key 列表键 * @return 列表值 */ public <t> t lpop(string key) { synchronized (objectcache) { linkedlist<object> queue = (linkedlist<object>) objectcache.get(key); return queue != null && !queue.isempty() ? (t) queue.removelast() : null; } } /** * 删除缓存数据 * * @param key 缓存键 */ public void del(string key) { objectcache.remove(key); } /** * 判断缓存键是否存在 * * @param key 缓存键 * @return 是否存在 */ public boolean exists(string key) { return objectcache.containskey(key); } }
详细讲解
batchdatastorageservice
字段和初始化:
maxbatchcount
:配置文件中指定的最大批次大小,用于控制每批处理的数据量。maxbatchthreads
:线程池的最大线程数,影响处理并发能力。batchtimeout
:批次超时时间,用于控制数据处理的最迟时间。batchcount
:当前批次中的数据条数,用于判断是否需要提交批次数据。batchno
:批次号,用于标识不同的批次。executorservice
:用于执行批量处理任务的线程池。cacheservice
、devicerealtimeservice
、redisutils
:分别用于缓存操作、数据存储和 redis 操作。
方法详解:
afterpropertiesset
:初始化线程池,以便在后续操作中执行批量处理任务。saverealtimedata
:- 将实时数据推入缓存中,检查是否需要提交批次数据。
- 如果超时或数据量达到阈值,则调用
datastorage
方法处理数据。
updaterealtimedata
:将数据更新到 redis,确保实时数据的可用性。datastorage
:- 执行批量数据的存储操作,并提交工作线程处理数据。
batchworker
:- 从缓存中获取数据并执行入库操作,将成功的数据记录日志,将失败的数据放入失败缓存。
batchtimeoutcommitthread
:- 处理批次超时逻辑,即使在未满批次的情况下也会提交数据,确保数据及时处理。
cacheservice
字段:
objectcache
:用于存储普通缓存数据。statcache
:用于存储统计数据,例如计数器等。
方法详解:
put/get/remove
:基本的缓存操作,支持存储、获取和删除数据。hset/hget/hexists
:- 对哈希表进行操作,支持设置、获取和检查键值对。
lpush/lpop
:- 对列表进行操作,支持推入和弹出数据。
incr/decr/stat
:- 对统计数据进行操作,支持增加、减少和获取当前值。
总结
本文介绍了如何在高并发环境下利用线程池和redis实现高效的数据入库。通过将数据首先存入redis缓存,并利用线程池定期批量处理入库操作,能够有效提升系统的性能和稳定性。完整代码包括核心的batchdatastorageservice服务类、cacheservice缓存服务类以及redisutils工具类,均提供了详细的注释和解析,以便读者理解和实现类似的高并发数据处理系统
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论