Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 377|回复: 0

[JDBC学习]java(多线程)实现高性能数据同步

[复制链接]
  • TA的每日心情
    开心
    2021-3-12 23:18
  • 签到天数: 2 天

    [LV.1]初来乍到

    发表于 2014-10-11 03:09:16 | 显示全部楼层 |阅读模式
    需要将生产环境上Infoxmix里的数据原封不动的Copy到另一台 Oracle数据库服务器上,然后对Copy后的数据作些漂白处理。为了将人为干预的因素降到最低,在系统设计时采用java代码对数据作Copy,思路 如图:
       

       

       
       

       
         首 先在代码与生产库间建立一个Connection,将读取到的数据放在ResultSet对象,然后再与开发库建立一个Connection。从 ResultSet取出数据后通过TestConnection插入到开发库,以此来实现Copy。代码写完后运行程序,速度太慢了,一秒钟只能Copy 一千条数据,生产库上有上亿条数据,按照这个速度同步完要到猴年马月呀,用PreparedStatement批处理速度也没有提交多少。我想能不能用多 线程处理,多个人干活总比一个人干活速度要快。
       
         假设生产库有1万条数据,我开5个线程,每个线程分2000条数据,同时向开发库里插数据,Oracle支持高并发这样的话速度至少会提高好多倍,按照这 个思路重新进行了编码,批处理设置为1万条一提交,统计插入数量的变量使用 java.util.concurrent.atomic.AtomicLong,程序一运行,传输速度飞快CPU利用率在70%~90%,现在一秒钟可 以拷贝50万条记录,没过几分钟上亿条数据一条不落地全部Copy到目标库。
       

       
    在查询的时候我用了如下语句
       

        String queryStr = "SELECT * FROM xx";
    ResultSet coreRs = PreparedStatement.executeQuery(queryStr);[/code]
       
    实习生问如果xx表里有上千万条记录,你全部查询出来放到ResultSet, 那内存不溢出了么?Java在设计的时候已经考虑到这个问题了,并没有查询出所有的数据,而是只查询了一部分数据放到ResultSet,数据“用完”它 会自动查询下一批数据,你可以用setFetchSize(int rows)方法设置一个建议值给ResultSet,告诉它每次从数据库Fetch多少条数据。但我不赞成,因为JDBC驱动会根据实际情况自动调整 Fetch的数量。另外性能也与网线的带宽有直接的关系。
       
    相关代码
       

       

        package com.dlbank.domain;  
      
    import java.sql.Connection;  
    import java.sql.PreparedStatement;  
    import java.sql.ResultSet;  
    import java.sql.Statement;  
    import java.util.List;  
    import java.util.concurrent.atomic.AtomicLong;  
      
    import org.apache.log4j.Logger;  
      
    /**
    *<p>title: 数据同步类 </p>   
    *<p>Description: 该类用于将生产核心库数据同步到开发库</p>   
    *@author Tank Zhang  
    */  
    public class CoreDataSyncImpl implements CoreDataSync {  
          
        private List<String> coreTBNames; //要同步的核心库表名  
        private ConnectionFactory connectionFactory;  
        private Logger log = Logger.getLogger(getClass());  
          
        private AtomicLong currentSynCount = new AtomicLong(0L); //当前已同步的条数  
          
        private int syncThreadNum;  //同步的线程数  
      
        @Override  
        public void syncData(int businessType) throws Exception {  
             
            for (String tmpTBName : coreTBNames) {  
                log.info("开始同步核心库" + tmpTBName + "表数据");  
                // 获得核心库连接  
                Connection coreConnection = connectionFactory.getDMSConnection(4);  
                Statement coreStmt = coreConnection.createStatement();  
                //为每个线程分配结果集  
                ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);  
                coreRs.next();  
                //总共处理的数量  
                long totalNum = coreRs.getLong(1);  
                //每个线程处理的数量  
                long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));   
                log.info("共需要同步的数据量:"+totalNum);  
                log.info("同步线程数量:"+syncThreadNum);  
                log.info("每个线程可处理的数量:"+ownerRecordNum);  
                // 开启五个线程向目标库同步数据  
                for(int i=0; i < syncThreadNum; i ++){  
                    StringBuilder sqlBuilder = new StringBuilder();  
                    //拼装后SQL示例  
                    //Select * From dms_core_ds Where id between 1 And 657398  
                    //Select * From dms_core_ds Where id between 657399 And 1314796  
                    //Select * From dms_core_ds Where id between 1314797 And 1972194  
                    //Select * From dms_core_ds Where id between 1972195 And 2629592  
                    //Select * From dms_core_ds Where id between 2629593 And 3286990  
                    //..  
                    sqlBuilder.append("Select * From ").append(tmpTBName)  
                            .append(" Where id between " ).append(i * ownerRecordNum +1)  
                            .append( " And ")  
                            .append((i * ownerRecordNum + ownerRecordNum));  
                    Thread workThread = new Thread(  
                            new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));  
                    workThread.setName("SyncThread-"+i);  
                    workThread.start();  
                }  
                while (currentSynCount.get() < totalNum);  
                //休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,
                            //因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作);  
                //Thread.sleep(1000 * 3);  
                log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据");  
            }  
        }// end for loop  
          
        public void setCoreTBNames(List<String> coreTBNames) {  
            this.coreTBNames = coreTBNames;  
        }  
      
        public void setConnectionFactory(ConnectionFactory connectionFactory) {  
            this.connectionFactory = connectionFactory;  
        }  
          
        public void setSyncThreadNum(int syncThreadNum) {  
            this.syncThreadNum = syncThreadNum;  
        }  
          
        //数据同步线程  
        final class WorkerHandler implements Runnable {  
            ResultSet coreRs;  
            String queryStr;  
            int businessType;  
            String targetTBName;  
            public WorkerHandler(String queryStr,int businessType,String targetTBName) {  
                this.queryStr = queryStr;  
                this.businessType = businessType;  
                this.targetTBName = targetTBName;  
            }  
            @Override  
            public void run() {  
                try {  
                    //开始同步  
                    launchSyncData();  
                } catch(Exception e){  
                    log.error(e);  
                    e.printStackTrace();  
                }  
            }  
            //同步数据方法  
            void launchSyncData() throws Exception{  
                // 获得核心库连接  
                Connection coreConnection = connectionFactory.getDMSConnection(4);  
                Statement coreStmt = coreConnection.createStatement();  
                // 获得目标库连接  
                Connection targetConn = connectionFactory.getDMSConnection(businessType);  
                targetConn.setAutoCommit(false);// 设置手动提交  
                PreparedStatement targetPstmt =
                             targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");  
                ResultSet coreRs = coreStmt.executeQuery(queryStr);  
                log.info(Thread.currentThread().getName()+""s Query SQL::"+queryStr);  
                int batchCounter = 0; //累加的批处理数量  
                while (coreRs.next()) {  
                    targetPstmt.setString(1, coreRs.getString(2));  
                    targetPstmt.setString(2, coreRs.getString(3));  
                    targetPstmt.setString(3, coreRs.getString(4));  
                    targetPstmt.setString(4, coreRs.getString(5));  
                    targetPstmt.setString(5, coreRs.getString(6));  
                    targetPstmt.addBatch();  
                    batchCounter++;  
                    currentSynCount.incrementAndGet();//递增  
                    if (batchCounter % 10000 == 0) { //1万条数据一提交  
                        targetPstmt.executeBatch();  
                        targetPstmt.clearBatch();  
                        targetConn.commit();  
                    }  
                }  
                //提交剩余的批处理  
                targetPstmt.executeBatch();  
                targetPstmt.clearBatch();  
                targetConn.commit();  
                //释放连接   
                connectionFactory.release(targetConn, targetPstmt,coreRs);  
            }  
        }  
    }  [/code]
       

       

       

       
         
         
          
          

            
          

            
          
         
       

      


    源码下载:http://file.javaxxz.com/2014/10/11/030915859.zip
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-2 15:14 , Processed in 0.421453 second(s), 50 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表