spring data jpa批量插入或更新
1. batchconsumer
package com.demo.common.hibernate.batch; import com.demo.common.hibernate.querydsl.queryparameterbuilder; /** * 批量数据消费者接口,用于设置 sql 参数并执行操作。 * * @param <t> 记录类型的泛型 * @author xm.z */ @functionalinterface public interface batchconsumer<t> { /** * 设置 sql 参数并执行操作。 * * @param builder 参数构建对象 * @param record 要处理的记录 */ void accept(queryparameterbuilder builder, t record); }
2. queryparameterbuilder
package com.demo.common.hibernate.querydsl; import lombok.accesslevel; import lombok.getter; import lombok.extern.slf4j.slf4j; import org.hibernate.jpa.typedparametervalue; import org.hibernate.type.*; import org.springframework.util.assert; import javax.persistence.query; import java.math.bigdecimal; import java.time.localdate; import java.time.localdatetime; import java.time.localtime; import java.util.date; import java.util.concurrent.atomic.atomicinteger; /** * queryparameterbuilder * <p> * a utility class for building parameters for query. * * @author xm.z */ @slf4j @getter public class queryparameterbuilder { /** * the native query object to be used for parameter setting */ private final query nativequery; /** * the counter for parameter position */ @getter(value = accesslevel.none) private final atomicinteger position; /** * the current date and time when the queryparameterbuilder instance is created */ private final localdatetime now; /** * private constructor to initialize queryparameterbuilder */ private queryparameterbuilder(query nativequery, atomicinteger position) { this.nativequery = nativequery; this.position = position; this.now = localdatetime.now(); } /** * retrieves the current position of the parameter. * * @return the current position of the parameter. */ public integer obtaincurrentposition() { return position.get(); } /** * create an instance of queryparameterbuilder. * * @param nativequery the native query object * @param position the parameter position counter * @return queryparameterbuilder instance */ public static queryparameterbuilder create(query nativequery, atomicinteger position) { assert.notnull(nativequery, "native query must not be null"); assert.notnull(position, "position must not be null"); return new queryparameterbuilder(nativequery, position); } /** * set a parameter of type long. * * @param value the long value for the parameter * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(long value) { return this.setparameter(standardbasictypes.long, value); } /** * set a parameter of type integer. * * @param value the integer value for the parameter * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(integer value) { return this.setparameter(standardbasictypes.integer, value); } /** * set a parameter of type bigdecimal. * * @param value the bigdecimal value for the parameter * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(bigdecimal value) { return this.setparameter(standardbasictypes.big_decimal, value); } /** * set a parameter of type string. * * @param value the string value for the parameter * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(string value) { return this.setparameter(standardbasictypes.string, value); } /** * set a parameter of type boolean. * * @param value the boolean value for the parameter * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(boolean value) { return this.setparameter(standardbasictypes.boolean, value); } /** * set a parameter of type date. * * @param value the date value for the parameter * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(date value) { return this.setparameter(standardbasictypes.date, value); } /** * set a parameter of type localdate. * * @param value the localdate value for the parameter * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(localdate value) { return this.setparameter(localdatetype.instance, value); } /** * set a parameter of type localtime. * * @param value the localtime value for the parameter * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(localtime value) { return this.setparameter(localtimetype.instance, value); } /** * set a parameter of type localdatetime. * * @param value the localdatetime value for the parameter * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(localdatetime value) { return this.setparameter(localdatetimetype.instance, value); } /** * add or include a query condition to the native query object and set the parameter value. * * @param type the parameter type * @param value the parameter value * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(type type, object value) { return this.setparameter(position.getandincrement(), type, value); } /** * add or include a query condition to the native query object and set the parameter value at the specified position. * * @param position the position of the parameter in the query * @param type the parameter type * @param value the parameter value * @return the current queryparameterbuilder instance */ public queryparameterbuilder setparameter(int position, type type, object value) { typedparametervalue typedparametervalue = new typedparametervalue(type, value); if (log.isdebugenabled()) { log.debug("setting parameter at position {}: {}", position, typedparametervalue); } nativequery.setparameter(position, typedparametervalue); return this; } }
3. keyvalue
package com.demo.common.model; import io.swagger.v3.oas.annotations.media.schema; import lombok.allargsconstructor; import lombok.data; import lombok.noargsconstructor; import java.io.serializable; /** * 用于表示键值对的通用类 * * @param <k> 键的类型 * @param <v> 值的类型 * @author xm.z */ @data @noargsconstructor @allargsconstructor public class keyvalue<k, v> implements serializable { private static final long serialversionuid = 1l; /** * 键 */ @schema(title = "键") private k key; /** * 值 */ @schema(title = "值") private v value; }
4. sqlutil
package com.demo.common.hibernate.util; import com.demo.common.hibernate.batch.batchconsumer; import com.demo.common.hibernate.querydsl.queryparameterbuilder; import com.demo.common.model.keyvalue; import cn.hutool.extra.spring.springutil; import cn.hutool.core.collection.collutil; import lombok.extern.slf4j.slf4j; import org.jetbrains.annotations.notnull; import org.springframework.lang.nonnull; import org.springframework.lang.nullable; import org.springframework.util.assert; import javax.persistence.entitymanager; import javax.persistence.query; import java.util.collections; import java.util.linkedhashset; import java.util.list; import java.util.concurrent.atomic.atomicinteger; import java.util.stream.collectors; /** * sqlutil * * @author xm.z */ @slf4j @suppresswarnings("all") public class sqlutil { /** * default batch insert size. */ public static final int default_batch_size = 100; /** * private constructor. */ private sqlutil() { } /** * batch insert records into the database. * * @param tablefields the table fields information * @param records the list of records to be inserted * @param consumer the consumer function interface for customizing the insert behavior * @param <t> the type of records * @return the number of records successfully inserted */ public static <t> int batchinsert(@nonnull keyvalue<string, linkedhashset<string>> tablefields, @nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) { return batchinsert(default_batch_size, tablefields, records, consumer); } /** * perform batch insert operation with the specified batch size. * * @param batchsize the size of each batch for insertion * @param tablefields the key-value pair representing the table fields * @param records the list of records to be inserted * @param consumer the batch consumer for processing each batch of records * @param <t> the type of records * @return the total number of records successfully inserted */ public static <t> int batchinsert(int batchsize, @nonnull keyvalue<string, linkedhashset<string>> tablefields, @nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) { entitymanager entitymanager = springutil.getbean(entitymanager.class); return batchexecuteupdate(batchsize, entitymanager, tablefields, null, records, consumer); } /** * batch insert records into the database. * * @param entitymanager the entity manager * @param tablefields the table fields information * @param records the list of records to be inserted * @param consumer the consumer function interface for customizing the insert behavior * @param <t> the type of records * @return the number of records successfully inserted */ public static <t> int batchinsert(entitymanager entitymanager, @nonnull keyvalue<string, linkedhashset<string>> tablefields, @nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) { return batchexecuteupdate(default_batch_size, entitymanager, tablefields, null, records, consumer); } /** * executes batch insert or update operations on the database using native sql with a default batch size. * * @param tablefields key-value pair representing the table name and its fields * @param updatefields set of fields to be updated if a record with matching primary key exists * @param records the list of records to be inserted or updated * @param consumer functional interface for accepting batch consumer operations * @param <t> the type of the records to be inserted or updated * @return the total number of rows affected by the batch operation */ public static <t> int batchinsertorupdate(@nonnull keyvalue<string, linkedhashset<string>> tablefields, @nonnull linkedhashset<string> updatefields, @nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) { return batchinsertorupdate(default_batch_size, tablefields, updatefields, records, consumer); } /** * executes batch insert or update operations on the database using native sql with a parameterized batch size. * * @param batchsize the size of each batch for insertion * @param tablefields key-value pair representing the table name and its fields * @param updatefields set of fields to be updated if a record with matching primary key exists * @param records the list of records to be inserted or updated * @param consumer functional interface for accepting batch consumer operations * @param <t> the type of the records to be inserted or updated * @return the total number of rows affected by the batch operation */ public static <t> int batchinsertorupdate(int batchsize, @nonnull keyvalue<string, linkedhashset<string>> tablefields, @nonnull linkedhashset<string> updatefields, @nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) { entitymanager entitymanager = springutil.getbean(entitymanager.class); return batchexecuteupdate(batchsize, entitymanager, tablefields, updatefields, records, consumer); } /** * executes batch insert or update operations on the database using native sql with a default batch size. * * @param entitymanager the entity manager * @param tablefields key-value pair representing the table name and its fields * @param updatefields set of fields to be updated if a record with matching primary key exists * @param records the list of records to be inserted or updated * @param consumer functional interface for accepting batch consumer operations * @param <t> the type of the records to be inserted or updated * @return the total number of rows affected by the batch operation */ public static <t> int batchinsertorupdate(entitymanager entitymanager, @nonnull keyvalue<string, linkedhashset<string>> tablefields, @nonnull linkedhashset<string> updatefields, @nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) { return batchexecuteupdate(default_batch_size, entitymanager, tablefields, updatefields, records, consumer); } /** * executes batch updates on the database using native sql with a parameterized batch size. * * @param batchsize the size of each batch for inserting records * @param entitymanager the entity manager for creating and executing queries * @param tablefields key-value pair representing the table name and its fields * @param updatefields set of fields to be updated if a record with matching primary key exists (optional) * @param records the list of records to be inserted * @param consumer functional interface for accepting batch consumer operations * @param <t> the type of the records to be inserted * @return the total number of rows affected by the batch operation */ private static <t> int batchexecuteupdate(int batchsize, entitymanager entitymanager, @nonnull keyvalue<string, linkedhashset<string>> tablefields, @nullable linkedhashset<string> updatefields, @nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) { if (records.isempty()) { log.debug("no records to process. the records list is empty."); return 0; } assert.notnull(entitymanager, "the entity manager must not be null."); assert.istrue(batchsize > 0 && batchsize < 500, "the batch size must be between 1 and 500."); atomicinteger totalrows = new atomicinteger(0); // split the records into batches based on the specified batch size list<list<t>> recordbatches = collutil.split(records, batchsize); for (list<t> batchrecords : recordbatches) { atomicinteger position = new atomicinteger(1); // generate the appropriate sql statement for the batch string preparedstatementsql = collutil.isempty(updatefields) ? generatebatchinsertsql(tablefields, batchrecords.size()) : generatebatchinsertorupdatesql(tablefields, updatefields, batchrecords.size()); // create a query instance for executing native sql statements query nativequery = entitymanager.createnativequery(preparedstatementsql); // create a parameter builder instance using queryparameterbuilder queryparameterbuilder parameterbuilder = queryparameterbuilder.create(nativequery, position); for (t record : batchrecords) { // set parameters for the prepared statement consumer.accept(parameterbuilder, record); } // execute the sql statement and accumulate the affected rows totalrows.addandget(nativequery.executeupdate()); } // return the total number of affected rows return totalrows.get(); } /** * generate batch insert sql statement. * * <p> * this method generates an sql statement for batch insertion into a specified table with the provided fields. * example sql statement: * <pre> * {@code insert into table_name ( field_1, field_2 ) values ( value_1, value_2 ), (value_3, value_4); } * </pre> * </p> * * @param tablefields the key-value pair representing the table name and its associated field set * @param batchsize the batch size for insertion * @return the batch insert sql statement */ private static string generatebatchinsertsql(@nonnull keyvalue<string, linkedhashset<string>> tablefields, int batchsize) { string preparedstatementsql = generateinsertstatement(tablefields.getkey(), tablefields.getvalue(), batchsize); if (log.isdebugenabled()) { log.debug("[batch insert] prepared {} records sql: {}", batchsize, preparedstatementsql); } return preparedstatementsql; } /** * generates sql statement for batch insert with on duplicate key update. * * @param tablefields key-value pair representing table name and its corresponding fields. * @param updatefields fields to be updated in case of duplicate key. * @param batchsize number of records to be inserted in a single batch. * @return sql statement for batch insert with on duplicate key update. * @throws illegalargumentexception if updatefields collection is empty. */ private static string generatebatchinsertorupdatesql(@nonnull keyvalue<string, linkedhashset<string>> tablefields, linkedhashset<string> updatefields, int batchsize) { assert.notempty(updatefields, "update field collection cannot be empty."); // generate the insert statement string insertstatement = generateinsertstatement(tablefields.getkey(), tablefields.getvalue(), batchsize); // initialize stringbuilder with initial capacity stringbuilder builder = new stringbuilder(insertstatement.length() + 100); // append insert statement builder.append(insertstatement).append(" on duplicate key update "); // append update clause string updateclause = updatefields.stream() .map(updatefield -> updatefield + " = values(" + updatefield + ")") .collect(collectors.joining(", ")); builder.append(updateclause); string preparedstatementsql = builder.tostring(); if (log.isdebugenabled()) { log.debug("[batch insert on duplicate key update] prepared {} records sql: {}", batchsize, preparedstatementsql); } return preparedstatementsql; } @notnull private static string generateinsertstatement(@nonnull string tablename, @nonnull linkedhashset<string> fields, int batchsize) { assert.hastext(tablename, "table name cannot be empty."); assert.notnull(fields, "field collection cannot be empty."); // set a reasonable initial capacity stringbuilder builder = new stringbuilder(fields.size() * 100); // concatenate field names string fieldnames = string.join(", ", fields); string intotemplate = string.format("insert into %s (%s) values ", tablename, fieldnames); // generate placeholders string placeholders = "(" + string.join(", ", collections.ncopies(fields.size(), "?")) + ")"; // construct the insert statement builder.append(intotemplate); for (int i = 0; i < batchsize; i++) { if (i > 0) { builder.append(", "); } builder.append(placeholders); } return builder.tostring(); } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论