|
|||||||
Скрипт Димы
Время создания: 25.09.2020 10:46
Раздел: INFO - JOB - CUBA - GroovyScripts
Запись: wwwlir/Tetra/master/base/16010019956jzioy57tv/text.html на raw.githubusercontent.com
|
|||||||
|
|||||||
import groovy.sql.Sql
import java.sql.Connection
import java.sql.DriverManager
import java.sql.SQLException
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
import static java.lang.System.nanoTime
ConnectionPool pool = new ConnectionPool()
Sql sql = new Sql(pool.getConnection())
cleanDB(sql)
validateDB(sql)
sql.execute('CREATE TABLE copy_rtneo_accrual AS SELECT document_number, document_date, bill_id, contract_position_id FROM rtneo_accrual WITH NO DATA ')
def proc = Runtime.getRuntime().availableProcessors()
println "Available processors: ${proc}"
Watch.startProgram = nanoTime()
Watch.inserting = nanoTime()
sql.eachRow('select distinct c.id from rtneo_contragent c join rtneo_accrual a on c.id = a.contragent_id') {
Queues.contragents.add(it[0])
}
ExecutorService executorService = Executors.newFixedThreadPool(proc)
2.times { executorService.execute(new BillAccrualProducer(pool: pool)) }
4.times { executorService.execute(new BillAccrualConsumer(pool: pool)) }
Sync.latchProducer.await()
executorService.shutdown()
Sync.latchConsumer.await()
Watch.inserting = nanoTime() - Watch.inserting
mergTables(sql)
testing(sql)
printResult()
sql.execute('DROP TABLE copy_rtneo_accrual')
def cleanDB(Sql sql) {
println 'Start clean DB'
try {
sql.execute('update rtneo_accrual set bill_id = null where bill_id is not null')
sql.execute('delete from rtneo_bill')
sql.execute('DROP table copy_rtneo_accrual')
println 'Clean DB Complete!'
} catch (SQLException e) {
println 'Clean DB ERROR'
}
}
def mergTables(sql) {
Watch.merging = nanoTime()
println "Start merge Tables"
sql.execute('UPDATE rtneo_accrual a SET bill_id = ac.bill_id FROM copy_rtneo_accrual ac ' +
'WHERE a.document_number = ac.document_number and a.document_date = ac.document_date and a.contract_position_id = ac.contract_position_id')
println 'Merge Complete'
Watch.merging = nanoTime() - Watch.merging
}
def printResult() {
println 'Время работы программы: ' + ((nanoTime() - Watch.startProgram) / 1000000000) + ' s'
println 'Время inserting: ' + (Watch.inserting / 1000000000) + ' s'
println 'Время слияния таблиц: ' + (Watch.merging / 1000000000) + ' s'
println 'Время тестирования 10000 строк по Random: ' + (Watch.testing / 1000000000) + ' s'
println 'Количество обработанных контрагентов: ' + Counts.contragents.get()
println 'Количество записей подготовлено для вставки в таблицу Bill: ' + Counts.preparedRowsToBill.get()
println 'Количество записей подготовлено для вставки в таблицу CopyAccrual: ' + Counts.preparedRowsToCopyAccrual.get()
println 'Количество записей вставлено в таблицу Bill: ' + Counts.insertRowsToBill.get()
println 'Количество записей вставлено в таблицу CopyAccrual: ' + Counts.insertRowsToCopyAccrual.get()
}
def testing(Sql sql) {
println 'Testing start'
Watch.testing = nanoTime()
// Тест на null bill_id в Accrual
// def count = sql.rows('select * from rtneo_accrual where bill_id is null').size()
// assert count == 0
// Тест всели записи вставились
Set listBillIdFromBill = []
Set listBillIdFromAccrual = []
sql.eachRow('select id from rtneo_bill') {
listBillIdFromBill << it.getProperty('id')
}
sql.eachRow('select distinct bill_id from rtneo_accrual') {
listBillIdFromAccrual << it.getProperty('bill_id')
}
def iter = listBillIdFromBill.iterator()
while (iter.hasNext()) {
def bill_id = iter.next()
if (listBillIdFromAccrual.contains(bill_id)) {
iter.remove()
}
}
listBillIdFromBill.each {
println it
}
// тест на валидность что все билы вставились туда куда нужно
def listContragent = []
sql.eachRow('select contragent_id as id from rtneo_bill limit 10000') {
listContragent << it.getProperty('id')
}
Random random = new Random()
for (i in 0..<10000) {
def contragent_id = listContragent[random.nextInt(10000)]
def bill = sql.rows('select id as bill_id, contragent_id, period_ as period, contract_id, document_number, document_date, volume as sum_volume, price, ' +
'sum_ as sum from rtneo_bill ' +
'where contragent_id = :contragent_id', [contragent_id: contragent_id]).get(0)
def bill_id = bill.get('bill_id')
def accrual = sql.rows('select a.contragent_id, a.period, cp.contract_id as contract_id, a.document_number, a.document_date, sum(a.amount) as sum_volume, price, ' +
'sum(a.total_sum) as sum ' +
'from rtneo_accrual a JOIN rtneo_contract_position cp on cp.id = a.contract_position_id ' +
'where a.bill_id = :bill_id ' +
'group by a.contragent_id, a.period, cp.id, a.document_number, a.document_date, price', [bill_id: bill_id]).get(0)
assert bill.get('contragent_id').equals(accrual.get('contragent_id'))
assert bill.get('period').equals(accrual.get('period'))
assert bill.get('contract_id').equals(accrual.get('contract_id'))
assert bill.get('document_number').equals(accrual.get('document_number'))
assert bill.get('document_date').equals(accrual.get('document_date'))
assert bill.get('sum_volume').equals(accrual.get('sum_volume'))
assert bill.get('price').equals(accrual.get('price'))
assert bill.get('sum').equals(accrual.get('sum'))
}
Watch.testing = nanoTime() - Watch.testing
println 'Test Complete'
}
def validateDB(Sql sql) {
List<String> errors = []
// Проверка document_date на NULL
sql.eachRow('select id from rtneo_accrual where document_date is null') {
errors << 'Table RTNEO_ACCRUAL, row document_date IS NULL, id_accrual: ' + it.getProperty('id')
}
// Сделать валидацию по полям СУММ
// В случае наличия ошибок печатаем их и завершаем работу
if (!errors.isEmpty()) {
errors.each { println it }
System.exit(0)
}
}
class BillAccrualProducer implements Runnable {
ConnectionPool pool
@Override
void run() {
Connection connection = pool.getConnection()
println 'BillAccrualProducer RUN Thread: ' + this + ', with ' + connection
Sql sql = new Sql(connection)
try {
while (Queues.contragents.peek() != null) {
def contragentId = Queues.contragents.poll()
if (contragentId == null) {
continue
}
sql.eachRow('select a.contragent_id, a.period, c.id as contract_id, a.document_number, a.document_date, sum(amount) as amount_sum, ' +
'a.price, sum(a.total_sum) as total_sum, cp.relevance, cp.id as contract_position_id ' +
'from (select * from rtneo_accrual where delete_ts is null and period between \'01/01/2019\' and \'31/12/2020\' and contragent_id = :id) a ' +
'LEFT JOIN RTNEO_CONTRACT_POSITION cp ON a.contract_position_id = cp.id ' +
'LEFT JOIN RTNEO_CONTRACT c ON cp.contract_id = c.id ' +
'group by a.contragent_id, a.period, c.id, a.document_number, a.document_date, a.price, cp.relevance, cp.id ' +
'having c.id is not null', [id: contragentId]) { it ->
def id = UUID.randomUUID()
def version = 1
def contragent_id = it.getProperty('contragent_id')
def period = it.getProperty('period')
def contract_id = it.getProperty('contract_id')
def document_number = it.getProperty('document_number')
def document_date = it.getProperty('document_date')
def amount_sum = it.getProperty('amount_sum')
def price = it.getProperty('price')
def total_sum = it.getProperty('total_sum')
def relevance = it.getProperty('relevance')
def contract_position_id = it.getProperty('contract_position_id')
Queues.billQueries.offer("INSERT INTO rtneo_bill (id, version, contragent_id, period_, contract_id, document_number, document_date, volume, price, sum_, relevance) " +
"VALUES ('${id}',${version},'${contragent_id}','${period}','${contract_id}',${document_number},'${document_date}',${amount_sum},${price},${total_sum}, ${relevance})")
Counts.preparedRowsToBill.andIncrement
Queues.accrualQueries.offer("INSERT INTO copy_rtneo_accrual (bill_id, document_number, document_date, contract_position_id) " +
"VALUES ('${id}', ${document_number}, '${document_date}', '${contract_position_id}')")
Counts.preparedRowsToCopyAccrual.andIncrement
}
}
} catch (InterruptedException e) {
e.printStackTrace()
} finally {
Counts.contragents.andIncrement
pool.returnConnection(connection)
Sync.latchProducer.countDown()
println 'BillAccrualProducer STOP Thread: ' + this
}
}
}
class BillAccrualConsumer implements Runnable {
ConnectionPool pool
Sql sql
LinkedList list = new LinkedList()
int count = 0
@Override
void run() {
println 'BillAccrualConsumer RUN, Thread: ' + this
Connection connection = pool.getConnection()
sql = new Sql(connection)
boolean availableBillQueues = true
boolean availableAccrualQueues = true
try {
while ((availableBillQueues || availableAccrualQueues) || !Thread.currentThread().isInterrupted()) {
if (Queues.billQueries.peek() != null) {
def query = Queues.billQueries.poll()
if (query != null) {
list.push(query)
Counts.insertRowsToBill.andIncrement
availableBillQueues = true
}
} else {
availableBillQueues = false
}
if (Queues.accrualQueries.peek() != null) {
def query = Queues.accrualQueries.poll()
if (query != null) {
list.push(query)
Counts.insertRowsToCopyAccrual.andIncrement
availableAccrualQueues = true
}
} else {
availableAccrualQueues = false
}
if (list.size() >= 99) {
executeQueries()
list = new LinkedList()
}
}
} catch (InterruptedException e) {
e.printStackTrace()
} finally {
executeQueries()
pool.returnConnection(connection)
Sync.latchConsumer.countDown()
println 'BillAccrualConsumer STOP, Thread: ' + this
}
}
void executeQueries() {
if (!list.isEmpty()) {
sql.withTransaction {
count += sql.withBatch(100) { stmt ->
list.forEach(query -> stmt.addBatch(query as String))
}.size()
}
}
}
}
class ConnectionPool {
LinkedList<Connection> pool = new LinkedList<>()
ConnectionPool() {
Class.forName('org.postgresql.Driver')
for (i in 0..<20) {
Connection connection = DriverManager.getConnection('jdbc:postgresql://localhost:5432/rtneo', 'cuba', 'cuba')
if (connection != null) {
pool.add(connection)
}
}
}
synchronized def getConnection() {
while (pool.isEmpty()) {
wait()
}
return pool.pop()
}
synchronized def returnConnection(Connection connection) {
pool.push(connection)
notifyAll()
}
}
class Counts {
static AtomicInteger contragents = new AtomicInteger(0)
static AtomicInteger preparedRowsToBill = new AtomicInteger()
static AtomicInteger preparedRowsToCopyAccrual = new AtomicInteger()
static AtomicInteger insertRowsToBill = new AtomicInteger(0)
static AtomicInteger insertRowsToCopyAccrual = new AtomicInteger(0)
}
class Watch {
static long startProgram = 0
static long merging = 0
static long inserting = 0
static long testing = 0
}
class Queues {
static ConcurrentLinkedQueue contragents = new ConcurrentLinkedQueue()
static ConcurrentLinkedDeque billQueries = new ConcurrentLinkedDeque()
static ConcurrentLinkedDeque accrualQueries = new ConcurrentLinkedDeque()
}
class Sync {
static CountDownLatch latchConsumer = new CountDownLatch(2)
static CountDownLatch latchProducer = new CountDownLatch(4)
} |
|||||||
Так же в этом разделе:
|
|||||||
|
|||||||
|