MyTetra Share
Делитесь знаниями!
Скрипт Димы
Время создания: 25.09.2020 10:46
Раздел: INFO - JOB - CUBA - GroovyScripts
Запись: wwwlir/Tetra/master/base/16010019956jzioy57tv/text.html на raw.githubusercontent.com
import groovy.sql.Sql

import java.sql.Connection
import java.sql.DriverManager
import java.sql.SQLException
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger

import static java.lang.System.nanoTime

ConnectionPool pool = new ConnectionPool()
Sql sql = new Sql(pool.getConnection())
cleanDB(sql)
validateDB(sql)
sql.execute('CREATE TABLE copy_rtneo_accrual AS SELECT document_number, document_date, bill_id, contract_position_id FROM rtneo_accrual WITH NO DATA ')
def proc = Runtime.getRuntime().availableProcessors()
println "Available processors: ${proc}"
Watch.startProgram = nanoTime()
Watch.inserting = nanoTime()
sql.eachRow('select distinct c.id from rtneo_contragent c join rtneo_accrual a on c.id = a.contragent_id') {
    Queues.contragents.add(it[0])
}
ExecutorService executorService = Executors.newFixedThreadPool(proc)
2.times { executorService.execute(new BillAccrualProducer(pool: pool)) }
4.times { executorService.execute(new BillAccrualConsumer(pool: pool)) }
Sync.latchProducer.await()
executorService.shutdown()
Sync.latchConsumer.await()
Watch.inserting = nanoTime() - Watch.inserting
mergTables(sql)
testing(sql)
printResult()
sql.execute('DROP TABLE copy_rtneo_accrual')


def cleanDB(Sql sql) {
    println 'Start clean DB'
    try {
        sql.execute('update rtneo_accrual set bill_id = null where bill_id is not null')
        sql.execute('delete from rtneo_bill')
        sql.execute('DROP table copy_rtneo_accrual')
        println 'Clean DB Complete!'
    } catch (SQLException e) {
        println 'Clean DB ERROR'
    }
}

def mergTables(sql) {
    Watch.merging = nanoTime()
    println "Start merge Tables"
    sql.execute('UPDATE rtneo_accrual a SET bill_id = ac.bill_id FROM copy_rtneo_accrual ac ' +
            'WHERE a.document_number = ac.document_number and a.document_date = ac.document_date and a.contract_position_id = ac.contract_position_id')
    println 'Merge Complete'
    Watch.merging = nanoTime() - Watch.merging
}

def printResult() {
    println 'Время работы программы: ' + ((nanoTime() - Watch.startProgram) / 1000000000) + ' s'
    println 'Время inserting: ' + (Watch.inserting / 1000000000) + ' s'
    println 'Время слияния таблиц: ' + (Watch.merging / 1000000000) + ' s'
    println 'Время тестирования 10000 строк по Random: ' + (Watch.testing / 1000000000) + ' s'
    println 'Количество обработанных контрагентов: ' + Counts.contragents.get()
    println 'Количество записей подготовлено для вставки в таблицу Bill: ' + Counts.preparedRowsToBill.get()
    println 'Количество записей подготовлено для вставки в таблицу CopyAccrual: ' + Counts.preparedRowsToCopyAccrual.get()
    println 'Количество записей вставлено в таблицу Bill: ' + Counts.insertRowsToBill.get()
    println 'Количество записей вставлено в таблицу CopyAccrual: ' + Counts.insertRowsToCopyAccrual.get()
}

def testing(Sql sql) {
    println 'Testing start'
    Watch.testing = nanoTime()
    // Тест на null bill_id в Accrual
//    def count = sql.rows('select * from rtneo_accrual where bill_id is null').size()
//    assert count == 0
    // Тест всели записи вставились
    Set listBillIdFromBill = []
    Set listBillIdFromAccrual = []
    sql.eachRow('select id from rtneo_bill') {
        listBillIdFromBill << it.getProperty('id')
    }
    sql.eachRow('select distinct bill_id from rtneo_accrual') {
        listBillIdFromAccrual << it.getProperty('bill_id')
    }

    def iter = listBillIdFromBill.iterator()
    while (iter.hasNext()) {
        def bill_id = iter.next()
        if (listBillIdFromAccrual.contains(bill_id)) {
            iter.remove()
        }
    }
    listBillIdFromBill.each {
        println it
    }
    // тест на валидность что все билы вставились туда куда нужно
    def listContragent = []
    sql.eachRow('select contragent_id as id from rtneo_bill limit 10000') {
        listContragent << it.getProperty('id')
    }
    Random random = new Random()

    for (i in 0..<10000) {
        def contragent_id = listContragent[random.nextInt(10000)]
        def bill = sql.rows('select id as bill_id, contragent_id, period_ as period, contract_id, document_number, document_date, volume as sum_volume, price, ' +
                'sum_ as sum from rtneo_bill ' +
                'where contragent_id = :contragent_id', [contragent_id: contragent_id]).get(0)

        def bill_id = bill.get('bill_id')

        def accrual = sql.rows('select a.contragent_id, a.period, cp.contract_id as contract_id, a.document_number, a.document_date, sum(a.amount) as sum_volume, price, ' +
                'sum(a.total_sum) as sum ' +
                'from rtneo_accrual a JOIN rtneo_contract_position cp on cp.id = a.contract_position_id ' +
                'where a.bill_id = :bill_id ' +
                'group by a.contragent_id, a.period, cp.id, a.document_number, a.document_date, price', [bill_id: bill_id]).get(0)

        assert bill.get('contragent_id').equals(accrual.get('contragent_id'))
        assert bill.get('period').equals(accrual.get('period'))
        assert bill.get('contract_id').equals(accrual.get('contract_id'))
        assert bill.get('document_number').equals(accrual.get('document_number'))
        assert bill.get('document_date').equals(accrual.get('document_date'))
        assert bill.get('sum_volume').equals(accrual.get('sum_volume'))
        assert bill.get('price').equals(accrual.get('price'))
        assert bill.get('sum').equals(accrual.get('sum'))
    }
    Watch.testing = nanoTime() - Watch.testing
    println 'Test Complete'

}

def validateDB(Sql sql) {
    List<String> errors = []
    // Проверка document_date на NULL
    sql.eachRow('select id from rtneo_accrual where document_date is null') {
        errors << 'Table RTNEO_ACCRUAL, row document_date IS NULL, id_accrual: ' + it.getProperty('id')
    }
    // Сделать валидацию по полям СУММ
    // В случае наличия ошибок печатаем их и завершаем работу
    if (!errors.isEmpty()) {
        errors.each { println it }
        System.exit(0)
    }
}

class BillAccrualProducer implements Runnable {
    ConnectionPool pool

    @Override
    void run() {
        Connection connection = pool.getConnection()
        println 'BillAccrualProducer RUN Thread: ' + this + ', with ' + connection
        Sql sql = new Sql(connection)
        try {
            while (Queues.contragents.peek() != null) {
                def contragentId = Queues.contragents.poll()
                if (contragentId == null) {
                    continue
                }
                sql.eachRow('select a.contragent_id, a.period, c.id as contract_id, a.document_number, a.document_date, sum(amount) as amount_sum, ' +
                        'a.price, sum(a.total_sum) as total_sum, cp.relevance, cp.id as contract_position_id ' +
                        'from (select * from rtneo_accrual where delete_ts is null and period between \'01/01/2019\' and \'31/12/2020\' and contragent_id = :id) a ' +
                        'LEFT JOIN RTNEO_CONTRACT_POSITION cp ON a.contract_position_id = cp.id ' +
                        'LEFT JOIN RTNEO_CONTRACT c ON cp.contract_id = c.id ' +
                        'group by a.contragent_id, a.period, c.id, a.document_number, a.document_date, a.price, cp.relevance, cp.id ' +
                        'having c.id is not null', [id: contragentId]) { it ->
                    def id = UUID.randomUUID()
                    def version = 1
                    def contragent_id = it.getProperty('contragent_id')
                    def period = it.getProperty('period')
                    def contract_id = it.getProperty('contract_id')
                    def document_number = it.getProperty('document_number')
                    def document_date = it.getProperty('document_date')
                    def amount_sum = it.getProperty('amount_sum')
                    def price = it.getProperty('price')
                    def total_sum = it.getProperty('total_sum')
                    def relevance = it.getProperty('relevance')
                    def contract_position_id = it.getProperty('contract_position_id')

                    Queues.billQueries.offer("INSERT INTO rtneo_bill (id, version, contragent_id, period_, contract_id, document_number, document_date, volume, price, sum_, relevance) " +
                            "VALUES ('${id}',${version},'${contragent_id}','${period}','${contract_id}',${document_number},'${document_date}',${amount_sum},${price},${total_sum}, ${relevance})")
                    Counts.preparedRowsToBill.andIncrement

                    Queues.accrualQueries.offer("INSERT INTO copy_rtneo_accrual (bill_id, document_number, document_date, contract_position_id) " +
                            "VALUES ('${id}', ${document_number}, '${document_date}', '${contract_position_id}')")
                    Counts.preparedRowsToCopyAccrual.andIncrement
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace()
        } finally {
            Counts.contragents.andIncrement
            pool.returnConnection(connection)
            Sync.latchProducer.countDown()
            println 'BillAccrualProducer STOP Thread: ' + this
        }
    }
}

class BillAccrualConsumer implements Runnable {
    ConnectionPool pool
    Sql sql
    LinkedList list = new LinkedList()
    int count = 0

    @Override
    void run() {
        println 'BillAccrualConsumer RUN, Thread: ' + this
        Connection connection = pool.getConnection()
        sql = new Sql(connection)
        boolean availableBillQueues = true
        boolean availableAccrualQueues = true
        try {
            while ((availableBillQueues || availableAccrualQueues) || !Thread.currentThread().isInterrupted()) {
                if (Queues.billQueries.peek() != null) {
                    def query = Queues.billQueries.poll()
                    if (query != null) {
                        list.push(query)
                        Counts.insertRowsToBill.andIncrement
                        availableBillQueues = true
                    }
                } else {
                    availableBillQueues = false
                }
                if (Queues.accrualQueries.peek() != null) {
                    def query = Queues.accrualQueries.poll()
                    if (query != null) {
                        list.push(query)
                        Counts.insertRowsToCopyAccrual.andIncrement
                        availableAccrualQueues = true
                    }
                } else {
                    availableAccrualQueues = false
                }
                if (list.size() >= 99) {
                    executeQueries()
                    list = new LinkedList()
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace()
        } finally {
            executeQueries()
            pool.returnConnection(connection)
            Sync.latchConsumer.countDown()
            println 'BillAccrualConsumer STOP, Thread: ' + this
        }
    }

    void executeQueries() {
        if (!list.isEmpty()) {
            sql.withTransaction {
                count += sql.withBatch(100) { stmt ->
                    list.forEach(query -> stmt.addBatch(query as String))
                }.size()
            }
        }
    }
}

class ConnectionPool {
    LinkedList<Connection> pool = new LinkedList<>()

    ConnectionPool() {
        Class.forName('org.postgresql.Driver')
        for (i in 0..<20) {
            Connection connection = DriverManager.getConnection('jdbc:postgresql://localhost:5432/rtneo', 'cuba', 'cuba')
            if (connection != null) {
                pool.add(connection)
            }
        }
    }

    synchronized def getConnection() {
        while (pool.isEmpty()) {
            wait()
        }
        return pool.pop()
    }

    synchronized def returnConnection(Connection connection) {
        pool.push(connection)
        notifyAll()
    }
}

class Counts {
    static AtomicInteger contragents = new AtomicInteger(0)
    static AtomicInteger preparedRowsToBill = new AtomicInteger()
    static AtomicInteger preparedRowsToCopyAccrual = new AtomicInteger()
    static AtomicInteger insertRowsToBill = new AtomicInteger(0)
    static AtomicInteger insertRowsToCopyAccrual = new AtomicInteger(0)
}

class Watch {
    static long startProgram = 0
    static long merging = 0
    static long inserting = 0
    static long testing = 0
}

class Queues {
    static ConcurrentLinkedQueue contragents = new ConcurrentLinkedQueue()
    static ConcurrentLinkedDeque billQueries = new ConcurrentLinkedDeque()
    static ConcurrentLinkedDeque accrualQueries = new ConcurrentLinkedDeque()
}

class Sync {
    static CountDownLatch latchConsumer = new CountDownLatch(2)
    static CountDownLatch latchProducer = new CountDownLatch(4)
}
Так же в этом разделе:
 
MyTetra Share v.0.65
Яндекс индекс цитирования