当前位置: 代码网 > it编程>编程语言>Java > Java如何利用线程池和Redis实现高效数据入库

Java如何利用线程池和Redis实现高效数据入库

2025年02月09日 Java 我要评论
利用线程池和redis实现高效数据入库在高并发环境中,进行数据入库是一项具有挑战性的任务。本文将介绍如何利用线程池和redis实现数据的实时缓存和批量入库处理,确保系统的性能和稳定性。主要思路和组件介

利用线程池和redis实现高效数据入库

在高并发环境中,进行数据入库是一项具有挑战性的任务。

本文将介绍如何利用线程池和redis实现数据的实时缓存和批量入库处理,确保系统的性能和稳定性。

主要思路和组件介绍

思路概述

在高并发情况下,数据入库需要解决两个主要问题:实时性和稳定性。

通过将数据首先存储在redis缓存中,可以快速响应和处理大量的数据请求,然后利用线程池定期批量将数据从redis取出并入库,以减少数据库压力和提高整体性能。

主要组件

  1. batchdatastorageservice:核心服务类,负责数据的实时缓存和定期批量入库处理。
  2. cacheservice:简易缓存服务类,使用concurrenthashmap实现内存缓存,用于快速存取和处理数据。
  3. redisutils:封装了对redis的操作,用于数据的持久化存储和高速读取。
  4. batchworker:实现了runnable接口,处理从redis中读取数据并执行批量入库的任务。
  5. batchtimeoutcommitthread:定时监控数据是否达到设定的批次或超时时间,并触发数据入库操作。

详细代码解析

  • batchdatastorageservice
package io.jack.service.impl;

import com.alibaba.fastjson.json;
import com.alibaba.fastjson.jsonarray;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.initializingbean;
import org.springframework.beans.factory.annotation.value;
import org.springframework.stereotype.component;

import javax.annotation.resource;
import java.util.arraylist;
import java.util.list;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;

/**
 * 数据批量入库服务类
 */
@component
@slf4j
public class batchdatastorageservice implements initializingbean {

    /**
     * 最大批次数量
     */
    @value("${app.db.maxbatchcount:800}")
    private int maxbatchcount;

    /**
     * 最大线程数
     */
    @value("${app.db.maxbatchthreads:100}")
    private int maxbatchthreads;

    /**
     * 超时时间,单位毫秒
     */
    @value("${app.db.batchtimeout:3000}")
    private int batchtimeout;

    /**
     * 当前批次数量
     */
    private int batchcount = 0;

    /**
     * 当前批次号
     */
    private static long batchno = 0;

    /**
     * 线程池执行器
     */
    private executorservice executorservice = null;

    /**
     * 缓存服务
     */
    @resource
    private cacheservice cacheservice;

    /**
     * 设备实时服务
     */
    @resource
    private devicerealtimeservice devicerealtimeservice;

    /**
     * redis工具类
     */
    @resource
    private redisutils redisutils;

    /**
     * 初始化线程池
     */
    @override
    public void afterpropertiesset() {
        executorservice = executors.newfixedthreadpool(maxbatchthreads);
    }

    /**
     * 保存设备实时数据
     * 
     * @param devicerealtimedto 设备实时数据传输对象
     */
    public void saverealtimedata(devicerealtimedto devicerealtimedto) {
        final string failedcachekey = "device:real_time:failed_records";

        try {
            // 生成批次和持续时间的缓存键
            string durationkey = "device:real_time:batchduration" + batchno;
            string batchkey = "device:real_time:batch" + batchno;

            // 如果当前批次持续时间不存在,则创建并启动超时处理线程
            if (!cacheservice.exists(durationkey)) {
                cacheservice.put(durationkey, system.currenttimemillis());
                new batchtimeoutcommitthread(batchkey, durationkey, failedcachekey).start();
            }

            // 将设备实时数据加入当前批次
            cacheservice.lpush(batchkey, devicerealtimedto);
            if (++batchcount >= maxbatchcount) {
                // 达到最大批次,执行入库逻辑
                datastorage(durationkey, batchkey, failedcachekey);
            }

        } catch (exception ex) {
            log.warn("[db:failed] 设备上报记录入批处理集合异常: " + ex.getmessage() + ", devicerealtimedto: " + json.tojsonstring(devicerealtimedto), ex);
            cacheservice.lpush(failedcachekey, devicerealtimedto);
        } finally {
            updaterealtimedata(devicerealtimedto);
        }
    }

    /**
     * 更新实时数据到redis
     * 
     * @param devicerealtimedto 设备实时数据传输对象
     */
    private void updaterealtimedata(devicerealtimedto devicerealtimedto) {
        redisutils.set("real_time:" + devicerealtimedto.getdeviceid(), jsonarray.tojsonstring(devicerealtimedto));
    }

    /**
     * 批量入库处理
     * 
     * @param durationkey 持续时间标识
     * @param batchkey    批次标识
     * @param failedcachekey 错误记录标识
     */
    private void datastorage(string durationkey, string batchkey, string failedcachekey) {
        batchno++;
        batchcount = 0;
        cacheservice.del(durationkey);
        if (batchno >= long.max_value) {
            batchno = 0;
        }
        executorservice.execute(new batchworker(batchkey, failedcachekey));
    }

    /**
     * 批量工作线程
     */
    private class batchworker implements runnable {

        private final string failedcachekey;
        private final string batchkey;

        public batchworker(string batchkey, string failedcachekey) {
            this.batchkey = batchkey;
            this.failedcachekey = failedcachekey;
        }

        @override
        public void run() {
            final list<devicerealtimedto> devicerealtimedtolist = new arraylist<>();
            try {
                // 从缓存中获取批量数据
                devicerealtimedto devicerealtimedto = cacheservice.lpop(batchkey);
                while (devicerealtimedto != null) {
                    devicerealtimedtolist.add(devicerealtimedto);
                    devicerealtimedto = cacheservice.lpop(batchkey);
                }

                long timemillis = system.currenttimemillis();

                try {
                    // 将dto转换为实体对象并批量入库
                    list<devicerealtimeentity> devicerealtimeentitylist = convertutils.sourcetotarget(devicerealtimedtolist, devicerealtimeentity.class);
                    devicerealtimeservice.insertbatch(devicerealtimeentitylist);
                } finally {
                    cacheservice.del(batchkey);
                    log.info("[db:batch_worker] 批次:" + batchkey + ",保存设备上报记录数:" + devicerealtimedtolist.size() + ", 耗时:" + (system.currenttimemillis() - timemillis) + "ms");
                }
            } catch (exception e) {
                log.warn("[db:failed] 设备上报记录批量入库失败:" + e.getmessage() + ", devicerealtimedto: " + devicerealtimedtolist.size(), e);
                for (devicerealtimedto devicerealtimedto : devicerealtimedtolist) {
                    cacheservice.lpush(failedcachekey, devicerealtimedto);
                }
            }
        }
    }

    /**
     * 批次超时提交线程
     */
    class batchtimeoutcommitthread extends thread {

        private final string batchkey;
        private final string durationkey;
        private final string failedcachekey;

        public batchtimeoutcommitthread(string batchkey, string durationkey, string failedcachekey) {
            this.batchkey = batchkey;
            this.durationkey = durationkey;
            this.failedcachekey = failedcachekey;
            this.setname("batch-thread-" + batchkey);
        }

        @override
        public void run() {
            try {
                thread.sleep(batchtimeout);
            } catch (interruptedexception e) {
                log.error("[db] 内部错误,直接提交:" + e.getmessage());
            }

            if (cacheservice.exists(durationkey)) {
                // 达到最大批次的超时间,执行入库逻辑
                datastorage(durationkey, batchkey, failedcachekey);
            }
        }
    }
}
  • cacheservice
package io.jack.service;

import org.springframework.beans.factory.initializingbean;
import org.springframework.stereotype.component;

import java.util.linkedlist;
import java.util.map;
import java.util.concurrent.concurrenthashmap;
import java.util.concurrent.atomic.atomiclong;

/**
 * 缓存服务类,提供简易的缓存机制
 */
@component
public class cacheservice implements initializingbean {

    /**
     * 内存缓存,用于存储数据
     */
    private map<string, object> objectcache = new concurrenthashmap<>();

    /**
     * 统计缓存,用于统计计数
     */
    private map<string, atomiclong> statcache = new concurrenthashmap<>();

    /**
     * 初始化统计缓存
     */
    @override
    public void afterpropertiesset() {
        statcache.put("terminals", new atomiclong(0));
        statcache.put("connections", new atomiclong(0));
    }

    /**
     * 增加指定统计项的计数
     * 
     * @param statname 统计项名称
     * @return 增加后的计数值
     */
    public long incr

(string statname) {
        statcache.putifabsent(statname, new atomiclong(0));
        return statcache.get(statname).incrementandget();
    }

    /**
     * 减少指定统计项的计数
     * 
     * @param statname 统计项名称
     * @return 减少后的计数值
     */
    public long decr(string statname) {
        statcache.putifabsent(statname, new atomiclong(0));
        return statcache.get(statname).decrementandget();
    }

    /**
     * 获取指定统计项的当前计数值
     * 
     * @param statname 统计项名称
     * @return 当前计数值
     */
    public long stat(string statname) {
        statcache.putifabsent(statname, new atomiclong(0));
        return statcache.get(statname).get();
    }

    /**
     * 存储数据
     * 
     * @param key 缓存键
     * @param object 缓存数据
     */
    public <t> void put(string key, t object) {
        objectcache.put(key, object);
    }

    /**
     * 获取数据
     * 
     * @param key 缓存键
     * @return 缓存数据
     */
    public <t> t get(string key) {
        return (t) objectcache.get(key);
    }

    /**
     * 删除数据
     * 
     * @param key 缓存键
     */
    public void remove(string key) {
        objectcache.remove(key);
    }

    /**
     * 存储哈希表数据
     * 
     * @param key 哈希表键
     * @param subkey 哈希表子键
     * @param value 哈希表值
     */
    public void hset(string key, string subkey, object value) {
        synchronized (objectcache) {
            map<string, object> submap = (map<string, object>) objectcache.computeifabsent(key, k -> new concurrenthashmap<>());
            submap.put(subkey, value);
        }
    }

    /**
     * 获取哈希表数据
     * 
     * @param key 哈希表键
     * @param subkey 哈希表子键
     * @return 哈希表值
     */
    public <t> t hget(string key, string subkey) {
        synchronized (objectcache) {
            map<string, object> submap = (map<string, object>) objectcache.get(key);
            return submap != null ? (t) submap.get(subkey) : null;
        }
    }

    /**
     * 判断哈希表子键是否存在
     * 
     * @param key 哈希表键
     * @param subkey 哈希表子键
     * @return 是否存在
     */
    public boolean hexists(string key, string subkey) {
        synchronized (objectcache) {
            map<string, object> submap = (map<string, object>) objectcache.get(key);
            return submap != null && submap.containskey(subkey);
        }
    }

    /**
     * 将数据推入列表
     * 
     * @param key 列表键
     * @param value 列表值
     */
    public void lpush(string key, object value) {
        synchronized (objectcache) {
            linkedlist<object> queue = (linkedlist<object>) objectcache.computeifabsent(key, k -> new linkedlist<>());
            queue.addlast(value);
        }
    }

    /**
     * 从列表中弹出数据
     * 
     * @param key 列表键
     * @return 列表值
     */
    public <t> t lpop(string key) {
        synchronized (objectcache) {
            linkedlist<object> queue = (linkedlist<object>) objectcache.get(key);
            return queue != null && !queue.isempty() ? (t) queue.removelast() : null;
        }
    }

    /**
     * 删除缓存数据
     * 
     * @param key 缓存键
     */
    public void del(string key) {
        objectcache.remove(key);
    }

    /**
     * 判断缓存键是否存在
     * 
     * @param key 缓存键
     * @return 是否存在
     */
    public boolean exists(string key) {
        return objectcache.containskey(key);
    }
}

详细讲解

batchdatastorageservice

字段和初始化

  • maxbatchcount:配置文件中指定的最大批次大小,用于控制每批处理的数据量。
  • maxbatchthreads:线程池的最大线程数,影响处理并发能力。
  • batchtimeout:批次超时时间,用于控制数据处理的最迟时间。
  • batchcount:当前批次中的数据条数,用于判断是否需要提交批次数据。
  • batchno:批次号,用于标识不同的批次。
  • executorservice:用于执行批量处理任务的线程池。
  • cacheservicedevicerealtimeserviceredisutils:分别用于缓存操作、数据存储和 redis 操作。

方法详解

  • afterpropertiesset:初始化线程池,以便在后续操作中执行批量处理任务。
  • saverealtimedata
    • 将实时数据推入缓存中,检查是否需要提交批次数据。
    • 如果超时或数据量达到阈值,则调用 datastorage 方法处理数据。
  • updaterealtimedata:将数据更新到 redis,确保实时数据的可用性。
  • datastorage
    • 执行批量数据的存储操作,并提交工作线程处理数据。
  • batchworker
    • 从缓存中获取数据并执行入库操作,将成功的数据记录日志,将失败的数据放入失败缓存。
  • batchtimeoutcommitthread
    • 处理批次超时逻辑,即使在未满批次的情况下也会提交数据,确保数据及时处理。

cacheservice

字段

  • objectcache:用于存储普通缓存数据。
  • statcache:用于存储统计数据,例如计数器等。

方法详解

  • put/get/remove:基本的缓存操作,支持存储、获取和删除数据。
  • hset/hget/hexists
    • 对哈希表进行操作,支持设置、获取和检查键值对。
  • lpush/lpop
    • 对列表进行操作,支持推入和弹出数据。
  • incr/decr/stat
    • 对统计数据进行操作,支持增加、减少和获取当前值。

总结

本文介绍了如何在高并发环境下利用线程池和redis实现高效的数据入库。通过将数据首先存入redis缓存,并利用线程池定期批量处理入库操作,能够有效提升系统的性能和稳定性。完整代码包括核心的batchdatastorageservice服务类、cacheservice缓存服务类以及redisutils工具类,均提供了详细的注释和解析,以便读者理解和实现类似的高并发数据处理系统

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com