+
Skip to content

Adding bulk inserts or updates for PostgreSQL #226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 2, 2021
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ subprojects { project ->
name = "Antony Denyer"
email = "git@antonydenyer.co.uk"
}
developer {
id = "pedrod"
name = "Pedro Domingues"
email = "pedro.domingues.pt@gmail.com"
}
}
scm {
url = "https://github.com/kotlin-orm/ktorm.git"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2018-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ktorm.support.postgresql

import org.ktorm.database.Database
import org.ktorm.dsl.AssignmentsBuilder
import org.ktorm.dsl.KtormDsl
import org.ktorm.dsl.batchInsert
import org.ktorm.expression.ColumnAssignmentExpression
import org.ktorm.expression.ColumnExpression
import org.ktorm.expression.SqlExpression
import org.ktorm.expression.TableExpression
import org.ktorm.schema.BaseTable
import org.ktorm.schema.Column

/**
* Bulk insert expression, represents a bulk insert statement in PostgreSQL.
*
* For example: `insert into table (column1, column2) values (?, ?), (?, ?), (?, ?)... ON
* CONFLICT (...) DO NOTHING/UPDATE SET ...`.
*
* @property table the table to be inserted.
* @property assignments column assignments of the bulk insert statement.
* @property conflictTarget the index columns on which the conflict may happens.
* @property updateAssignments the updated column assignments while key conflict exists.
*/
public data class BulkInsertExpression(
val table: TableExpression,
val assignments: List<List<ColumnAssignmentExpression<*>>>,
val conflictTarget: List<ColumnExpression<*>>,
val updateAssignments: List<ColumnAssignmentExpression<*>> = emptyList(),
override val isLeafNode: Boolean = false,
override val extraProperties: Map<String, Any> = emptyMap()
) : SqlExpression()

/**
* Construct a bulk insert expression in the given closure, then execute it and return the
* effected row count.
*
* The usage is almost the same as [batchInsert], but this function is implemented by generating a
* special SQL using PostgreSQL's bulk insert syntax, instead of based on JDBC batch operations.
* For this reason, its performance is much better than [batchInsert].
*
* The generated SQL is like: `insert into table (column1, column2) values (?, ?), (?, ?), (?, ?)... ON
* CONFLICT (...) DO NOTHING/UPDATE SET ...`.
*
* Usage:
*
* ```kotlin
* database.bulkInsert(Employees) {
* item {
* set(it.id, 1)
* set(it.name, "vince")
* set(it.job, "engineer")
* set(it.salary, 1000)
* set(it.hireDate, LocalDate.now())
* set(it.departmentId, 1)
* }
* item {
* set(it.id, 5)
* set(it.name, "vince")
* set(it.job, "engineer")
* set(it.salary, 1000)
* set(it.hireDate, LocalDate.now())
* set(it.departmentId, 1)
* }
*
* onDuplicateKey(Employees.id) {
* // Or leave this empty to simply ignore without updating (do nothing)
* set(it.salary, it.salary + 900)
* }
* }
* ```
*
* @since 3.3.0
* @param table the table to be inserted.
* @param block the DSL block, extension function of [BulkInsertStatementBuilder],
* used to construct the expression.
* @return the effected row count.
* @see batchInsert
*/
public fun <T : BaseTable<*>> Database.bulkInsert(
table: T,
block: BulkInsertStatementBuilder<T>.() -> Unit
): Int {
val builder = BulkInsertStatementBuilder(table).apply(block)

val expression = BulkInsertExpression(
table = table.asExpression(),
assignments = builder.assignments,
conflictTarget = builder.conflictColumns.map { it.asExpression() },
updateAssignments = builder.updateAssignments
)

return executeUpdate(expression)
}

/**
* DSL builder for bulk insert statements.
*/
@KtormDsl
public class BulkInsertStatementBuilder<T : BaseTable<*>>(internal val table: T) {
internal val assignments = ArrayList<List<ColumnAssignmentExpression<*>>>()
internal val conflictColumns = ArrayList<Column<*>>()
internal val updateAssignments = ArrayList<ColumnAssignmentExpression<*>>()

/**
* Add the assignments of a new row to the bulk insert.
*/
public fun item(block: AssignmentsBuilder.(T) -> Unit) {
val builder = PostgreSqlAssignmentsBuilder()
builder.block(table)

if (assignments.isEmpty()
|| assignments[0].map { it.column.name } == builder.assignments.map { it.column.name }
) {
assignments += builder.assignments
} else {
throw IllegalArgumentException("Every item in a batch operation must be the same.")
}
}

/**
* Specify the update assignments while any key conflict exists.
*/
public fun onDuplicateKey(vararg columns: Column<*>, block: AssignmentsBuilder.(T) -> Unit) {
val builder = PostgreSqlAssignmentsBuilder()
builder.block(table)

updateAssignments += builder.assignments
conflictColumns += columns
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.ktorm.support.postgresql

import org.ktorm.database.Database
import org.ktorm.dsl.batchInsert
import org.ktorm.schema.BaseTable
import java.lang.reflect.InvocationTargetException

Expand Down Expand Up @@ -70,3 +71,51 @@ internal val Database.Companion.global: Database get() {
public fun <T : BaseTable<*>> T.insertOrUpdate(block: InsertOrUpdateStatementBuilder.(T) -> Unit): Int {
return Database.global.insertOrUpdate(this, block)
}

/**
* Construct a bulk insert-or-update expression in the given closure, then execute it and return the effected
* row count.
*
* The usage is almost the same as [batchInsert], but this function is implemented by generating a special SQL
* using PostgreSQL's bulk insert (with on conflict) syntax, instead of based on JDBC batch operations.
* For this reason, its performance is much better than [batchInsert].
*
* The generated SQL is like: `insert into table (column1, column2) values (?, ?), (?, ?), (?, ?)... ON
* CONFLICT (...) DO NOTHING/UPDATE SET ...`.
*
* Usage:
*
* ```kotlin
* database.bulkInsert(Employees) {
* item {
* set(it.id, 1)
* set(it.name, "vince")
* set(it.job, "engineer")
* set(it.salary, 1000)
* set(it.hireDate, LocalDate.now())
* set(it.departmentId, 1)
* }
* item {
* set(it.id, 5)
* set(it.name, "vince")
* set(it.job, "engineer")
* set(it.salary, 1000)
* set(it.hireDate, LocalDate.now())
* set(it.departmentId, 1)
* }
*
* onDuplicateKey(Employees.id) {
* // Or leave this empty to simply ignore without updating (do nothing)
* set(it.salary, it.salary + 900)
* }
* }
* ```
*
* @param block the DSL block, extension function of [BulkInsertStatementBuilder],
* used to construct the expression.
* @return the effected row count.
* @see batchInsert
*/
public fun <T : BaseTable<*>> T.bulkInsert(block: BulkInsertStatementBuilder<T>.() -> Unit): Int {
return Database.global.bulkInsert(this, block)
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public open class PostgreSqlFormatter(
override fun visit(expr: SqlExpression): SqlExpression {
val result = when (expr) {
is InsertOrUpdateExpression -> visitInsertOrUpdate(expr)
is BulkInsertExpression -> visitBulkInsert(expr)
else -> super.visit(expr)
}

Expand Down Expand Up @@ -129,35 +130,83 @@ public open class PostgreSqlFormatter(
return expr
}

protected open fun visitBulkInsert(expr: BulkInsertExpression): BulkInsertExpression {
generateMultipleInsertSQL(expr.table.name.quoted, expr.assignments)

generateOnConflictSQL(expr.conflictTarget, expr.updateAssignments)

return expr
}

protected open fun visitInsertOrUpdate(expr: InsertOrUpdateExpression): InsertOrUpdateExpression {
generateMultipleInsertSQL(expr.table.name.quoted, listOf(expr.assignments))

generateOnConflictSQL(expr.conflictTarget, expr.updateAssignments)

return expr
}

private fun generateMultipleInsertSQL(
quotedTableName: String,
assignmentsList: List<List<ColumnAssignmentExpression<*>>>
) {
if (assignmentsList.isEmpty()) {
throw IllegalStateException("The insert expression has no values to insert")
}

writeKeyword("insert into ")
write("${expr.table.name.quoted} (")
for ((i, assignment) in expr.assignments.withIndex()) {

write("$quotedTableName (")
assignmentsList.first().forEachIndexed { i, assignment ->
if (i > 0) write(", ")
checkColumnName(assignment.column.name)
write(assignment.column.name.quoted)
}
writeKeyword(") values (")
visitExpressionList(expr.assignments.map { it.expression as ArgumentExpression })
writeKeyword(")")
writeKeyword(" values ")

assignmentsList.forEachIndexed { i, assignments ->
if (i > 0) write(", ")
writeKeyword("( ")
visitExpressionList(assignments.map { it.expression as ArgumentExpression })
writeKeyword(")")
}

removeLastBlank()
writeKeyword(") on conflict (")
for ((i, column) in expr.conflictTarget.withIndex()) {
}

private fun generateOnConflictSQL(
conflictTarget: List<ColumnExpression<*>>,
updateAssignments: List<ColumnAssignmentExpression<*>>
) {
if (conflictTarget.isEmpty()) {
// We are just performing an Insert operation, so any conflict will interrupt the query with an error
return
}

writeKeyword(" on conflict (")
conflictTarget.forEachIndexed { i, column ->
if (i > 0) write(", ")
checkColumnName(column.name)
write(column.name.quoted)
}
writeKeyword(") do update set ")
for ((i, assignment) in expr.updateAssignments.withIndex()) {
if (i > 0) {
removeLastBlank()
write(", ")
writeKeyword(") do ")

if (updateAssignments.isNotEmpty()) {
writeKeyword("update set ")
updateAssignments.forEachIndexed { i, assignment ->
if (i > 0) {
removeLastBlank()
write(", ")
}
checkColumnName(assignment.column.name)
write("${assignment.column.name.quoted} ")
write("= ")
visit(assignment.expression)
}
checkColumnName(assignment.column.name)
write("${assignment.column.name.quoted} ")
write("= ")
visit(assignment.expression)
} else {
writeKeyword("nothing")
}
return expr
}
}

Expand Down Expand Up @@ -218,7 +267,8 @@ public open class PostgreSqlExpressionVisitor : SqlExpressionVisitor() {
if (table === expr.table
&& assignments === expr.assignments
&& conflictTarget === expr.conflictTarget
&& updateAssignments === expr.updateAssignments) {
&& updateAssignments === expr.updateAssignments
) {
return expr
} else {
return expr.copy(
Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载