spring data jpa批量插入或更新
1. batchconsumer
package com.demo.common.hibernate.batch;
import com.demo.common.hibernate.querydsl.queryparameterbuilder;
/**
* 批量数据消费者接口,用于设置 sql 参数并执行操作。
*
* @param <t> 记录类型的泛型
* @author xm.z
*/
@functionalinterface
public interface batchconsumer<t> {
/**
* 设置 sql 参数并执行操作。
*
* @param builder 参数构建对象
* @param record 要处理的记录
*/
void accept(queryparameterbuilder builder, t record);
}2. queryparameterbuilder
package com.demo.common.hibernate.querydsl;
import lombok.accesslevel;
import lombok.getter;
import lombok.extern.slf4j.slf4j;
import org.hibernate.jpa.typedparametervalue;
import org.hibernate.type.*;
import org.springframework.util.assert;
import javax.persistence.query;
import java.math.bigdecimal;
import java.time.localdate;
import java.time.localdatetime;
import java.time.localtime;
import java.util.date;
import java.util.concurrent.atomic.atomicinteger;
/**
* queryparameterbuilder
* <p>
* a utility class for building parameters for query.
*
* @author xm.z
*/
@slf4j
@getter
public class queryparameterbuilder {
/**
* the native query object to be used for parameter setting
*/
private final query nativequery;
/**
* the counter for parameter position
*/
@getter(value = accesslevel.none)
private final atomicinteger position;
/**
* the current date and time when the queryparameterbuilder instance is created
*/
private final localdatetime now;
/**
* private constructor to initialize queryparameterbuilder
*/
private queryparameterbuilder(query nativequery, atomicinteger position) {
this.nativequery = nativequery;
this.position = position;
this.now = localdatetime.now();
}
/**
* retrieves the current position of the parameter.
*
* @return the current position of the parameter.
*/
public integer obtaincurrentposition() {
return position.get();
}
/**
* create an instance of queryparameterbuilder.
*
* @param nativequery the native query object
* @param position the parameter position counter
* @return queryparameterbuilder instance
*/
public static queryparameterbuilder create(query nativequery, atomicinteger position) {
assert.notnull(nativequery, "native query must not be null");
assert.notnull(position, "position must not be null");
return new queryparameterbuilder(nativequery, position);
}
/**
* set a parameter of type long.
*
* @param value the long value for the parameter
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(long value) {
return this.setparameter(standardbasictypes.long, value);
}
/**
* set a parameter of type integer.
*
* @param value the integer value for the parameter
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(integer value) {
return this.setparameter(standardbasictypes.integer, value);
}
/**
* set a parameter of type bigdecimal.
*
* @param value the bigdecimal value for the parameter
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(bigdecimal value) {
return this.setparameter(standardbasictypes.big_decimal, value);
}
/**
* set a parameter of type string.
*
* @param value the string value for the parameter
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(string value) {
return this.setparameter(standardbasictypes.string, value);
}
/**
* set a parameter of type boolean.
*
* @param value the boolean value for the parameter
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(boolean value) {
return this.setparameter(standardbasictypes.boolean, value);
}
/**
* set a parameter of type date.
*
* @param value the date value for the parameter
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(date value) {
return this.setparameter(standardbasictypes.date, value);
}
/**
* set a parameter of type localdate.
*
* @param value the localdate value for the parameter
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(localdate value) {
return this.setparameter(localdatetype.instance, value);
}
/**
* set a parameter of type localtime.
*
* @param value the localtime value for the parameter
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(localtime value) {
return this.setparameter(localtimetype.instance, value);
}
/**
* set a parameter of type localdatetime.
*
* @param value the localdatetime value for the parameter
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(localdatetime value) {
return this.setparameter(localdatetimetype.instance, value);
}
/**
* add or include a query condition to the native query object and set the parameter value.
*
* @param type the parameter type
* @param value the parameter value
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(type type, object value) {
return this.setparameter(position.getandincrement(), type, value);
}
/**
* add or include a query condition to the native query object and set the parameter value at the specified position.
*
* @param position the position of the parameter in the query
* @param type the parameter type
* @param value the parameter value
* @return the current queryparameterbuilder instance
*/
public queryparameterbuilder setparameter(int position, type type, object value) {
typedparametervalue typedparametervalue = new typedparametervalue(type, value);
if (log.isdebugenabled()) {
log.debug("setting parameter at position {}: {}", position, typedparametervalue);
}
nativequery.setparameter(position, typedparametervalue);
return this;
}
}3. keyvalue
package com.demo.common.model;
import io.swagger.v3.oas.annotations.media.schema;
import lombok.allargsconstructor;
import lombok.data;
import lombok.noargsconstructor;
import java.io.serializable;
/**
* 用于表示键值对的通用类
*
* @param <k> 键的类型
* @param <v> 值的类型
* @author xm.z
*/
@data
@noargsconstructor
@allargsconstructor
public class keyvalue<k, v> implements serializable {
private static final long serialversionuid = 1l;
/**
* 键
*/
@schema(title = "键")
private k key;
/**
* 值
*/
@schema(title = "值")
private v value;
}4. sqlutil
package com.demo.common.hibernate.util;
import com.demo.common.hibernate.batch.batchconsumer;
import com.demo.common.hibernate.querydsl.queryparameterbuilder;
import com.demo.common.model.keyvalue;
import cn.hutool.extra.spring.springutil;
import cn.hutool.core.collection.collutil;
import lombok.extern.slf4j.slf4j;
import org.jetbrains.annotations.notnull;
import org.springframework.lang.nonnull;
import org.springframework.lang.nullable;
import org.springframework.util.assert;
import javax.persistence.entitymanager;
import javax.persistence.query;
import java.util.collections;
import java.util.linkedhashset;
import java.util.list;
import java.util.concurrent.atomic.atomicinteger;
import java.util.stream.collectors;
/**
* sqlutil
*
* @author xm.z
*/
@slf4j
@suppresswarnings("all")
public class sqlutil {
/**
* default batch insert size.
*/
public static final int default_batch_size = 100;
/**
* private constructor.
*/
private sqlutil() {
}
/**
* batch insert records into the database.
*
* @param tablefields the table fields information
* @param records the list of records to be inserted
* @param consumer the consumer function interface for customizing the insert behavior
* @param <t> the type of records
* @return the number of records successfully inserted
*/
public static <t> int batchinsert(@nonnull keyvalue<string, linkedhashset<string>> tablefields,
@nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) {
return batchinsert(default_batch_size, tablefields, records, consumer);
}
/**
* perform batch insert operation with the specified batch size.
*
* @param batchsize the size of each batch for insertion
* @param tablefields the key-value pair representing the table fields
* @param records the list of records to be inserted
* @param consumer the batch consumer for processing each batch of records
* @param <t> the type of records
* @return the total number of records successfully inserted
*/
public static <t> int batchinsert(int batchsize, @nonnull keyvalue<string, linkedhashset<string>> tablefields,
@nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) {
entitymanager entitymanager = springutil.getbean(entitymanager.class);
return batchexecuteupdate(batchsize, entitymanager, tablefields, null, records, consumer);
}
/**
* batch insert records into the database.
*
* @param entitymanager the entity manager
* @param tablefields the table fields information
* @param records the list of records to be inserted
* @param consumer the consumer function interface for customizing the insert behavior
* @param <t> the type of records
* @return the number of records successfully inserted
*/
public static <t> int batchinsert(entitymanager entitymanager,
@nonnull keyvalue<string, linkedhashset<string>> tablefields,
@nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) {
return batchexecuteupdate(default_batch_size, entitymanager, tablefields, null, records, consumer);
}
/**
* executes batch insert or update operations on the database using native sql with a default batch size.
*
* @param tablefields key-value pair representing the table name and its fields
* @param updatefields set of fields to be updated if a record with matching primary key exists
* @param records the list of records to be inserted or updated
* @param consumer functional interface for accepting batch consumer operations
* @param <t> the type of the records to be inserted or updated
* @return the total number of rows affected by the batch operation
*/
public static <t> int batchinsertorupdate(@nonnull keyvalue<string, linkedhashset<string>> tablefields,
@nonnull linkedhashset<string> updatefields,
@nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) {
return batchinsertorupdate(default_batch_size, tablefields, updatefields, records, consumer);
}
/**
* executes batch insert or update operations on the database using native sql with a parameterized batch size.
*
* @param batchsize the size of each batch for insertion
* @param tablefields key-value pair representing the table name and its fields
* @param updatefields set of fields to be updated if a record with matching primary key exists
* @param records the list of records to be inserted or updated
* @param consumer functional interface for accepting batch consumer operations
* @param <t> the type of the records to be inserted or updated
* @return the total number of rows affected by the batch operation
*/
public static <t> int batchinsertorupdate(int batchsize, @nonnull keyvalue<string, linkedhashset<string>> tablefields,
@nonnull linkedhashset<string> updatefields,
@nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) {
entitymanager entitymanager = springutil.getbean(entitymanager.class);
return batchexecuteupdate(batchsize, entitymanager, tablefields, updatefields, records, consumer);
}
/**
* executes batch insert or update operations on the database using native sql with a default batch size.
*
* @param entitymanager the entity manager
* @param tablefields key-value pair representing the table name and its fields
* @param updatefields set of fields to be updated if a record with matching primary key exists
* @param records the list of records to be inserted or updated
* @param consumer functional interface for accepting batch consumer operations
* @param <t> the type of the records to be inserted or updated
* @return the total number of rows affected by the batch operation
*/
public static <t> int batchinsertorupdate(entitymanager entitymanager,
@nonnull keyvalue<string, linkedhashset<string>> tablefields,
@nonnull linkedhashset<string> updatefields,
@nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) {
return batchexecuteupdate(default_batch_size, entitymanager, tablefields, updatefields, records, consumer);
}
/**
* executes batch updates on the database using native sql with a parameterized batch size.
*
* @param batchsize the size of each batch for inserting records
* @param entitymanager the entity manager for creating and executing queries
* @param tablefields key-value pair representing the table name and its fields
* @param updatefields set of fields to be updated if a record with matching primary key exists (optional)
* @param records the list of records to be inserted
* @param consumer functional interface for accepting batch consumer operations
* @param <t> the type of the records to be inserted
* @return the total number of rows affected by the batch operation
*/
private static <t> int batchexecuteupdate(int batchsize, entitymanager entitymanager,
@nonnull keyvalue<string, linkedhashset<string>> tablefields,
@nullable linkedhashset<string> updatefields,
@nonnull list<t> records, @nonnull batchconsumer<? super t> consumer) {
if (records.isempty()) {
log.debug("no records to process. the records list is empty.");
return 0;
}
assert.notnull(entitymanager, "the entity manager must not be null.");
assert.istrue(batchsize > 0 && batchsize < 500, "the batch size must be between 1 and 500.");
atomicinteger totalrows = new atomicinteger(0);
// split the records into batches based on the specified batch size
list<list<t>> recordbatches = collutil.split(records, batchsize);
for (list<t> batchrecords : recordbatches) {
atomicinteger position = new atomicinteger(1);
// generate the appropriate sql statement for the batch
string preparedstatementsql = collutil.isempty(updatefields) ?
generatebatchinsertsql(tablefields, batchrecords.size()) :
generatebatchinsertorupdatesql(tablefields, updatefields, batchrecords.size());
// create a query instance for executing native sql statements
query nativequery = entitymanager.createnativequery(preparedstatementsql);
// create a parameter builder instance using queryparameterbuilder
queryparameterbuilder parameterbuilder = queryparameterbuilder.create(nativequery, position);
for (t record : batchrecords) {
// set parameters for the prepared statement
consumer.accept(parameterbuilder, record);
}
// execute the sql statement and accumulate the affected rows
totalrows.addandget(nativequery.executeupdate());
}
// return the total number of affected rows
return totalrows.get();
}
/**
* generate batch insert sql statement.
*
* <p>
* this method generates an sql statement for batch insertion into a specified table with the provided fields.
* example sql statement:
* <pre>
* {@code insert into table_name ( field_1, field_2 ) values ( value_1, value_2 ), (value_3, value_4); }
* </pre>
* </p>
*
* @param tablefields the key-value pair representing the table name and its associated field set
* @param batchsize the batch size for insertion
* @return the batch insert sql statement
*/
private static string generatebatchinsertsql(@nonnull keyvalue<string, linkedhashset<string>> tablefields, int batchsize) {
string preparedstatementsql = generateinsertstatement(tablefields.getkey(), tablefields.getvalue(), batchsize);
if (log.isdebugenabled()) {
log.debug("[batch insert] prepared {} records sql: {}", batchsize, preparedstatementsql);
}
return preparedstatementsql;
}
/**
* generates sql statement for batch insert with on duplicate key update.
*
* @param tablefields key-value pair representing table name and its corresponding fields.
* @param updatefields fields to be updated in case of duplicate key.
* @param batchsize number of records to be inserted in a single batch.
* @return sql statement for batch insert with on duplicate key update.
* @throws illegalargumentexception if updatefields collection is empty.
*/
private static string generatebatchinsertorupdatesql(@nonnull keyvalue<string, linkedhashset<string>> tablefields,
linkedhashset<string> updatefields, int batchsize) {
assert.notempty(updatefields, "update field collection cannot be empty.");
// generate the insert statement
string insertstatement = generateinsertstatement(tablefields.getkey(), tablefields.getvalue(), batchsize);
// initialize stringbuilder with initial capacity
stringbuilder builder = new stringbuilder(insertstatement.length() + 100);
// append insert statement
builder.append(insertstatement).append(" on duplicate key update ");
// append update clause
string updateclause = updatefields.stream()
.map(updatefield -> updatefield + " = values(" + updatefield + ")")
.collect(collectors.joining(", "));
builder.append(updateclause);
string preparedstatementsql = builder.tostring();
if (log.isdebugenabled()) {
log.debug("[batch insert on duplicate key update] prepared {} records sql: {}", batchsize, preparedstatementsql);
}
return preparedstatementsql;
}
@notnull
private static string generateinsertstatement(@nonnull string tablename, @nonnull linkedhashset<string> fields, int batchsize) {
assert.hastext(tablename, "table name cannot be empty.");
assert.notnull(fields, "field collection cannot be empty.");
// set a reasonable initial capacity
stringbuilder builder = new stringbuilder(fields.size() * 100);
// concatenate field names
string fieldnames = string.join(", ", fields);
string intotemplate = string.format("insert into %s (%s) values ", tablename, fieldnames);
// generate placeholders
string placeholders = "(" + string.join(", ", collections.ncopies(fields.size(), "?")) + ")";
// construct the insert statement
builder.append(intotemplate);
for (int i = 0; i < batchsize; i++) {
if (i > 0) {
builder.append(", ");
}
builder.append(placeholders);
}
return builder.tostring();
}
}总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论