使用java把10G文本导入mysql(并转码为csv)
使用java把10G文本导入mysql(并转码为csv)
第一次遇到10G的文本需要导入,所以一开始想直接按行导入,结果读取了txt发现有1亿8千万行,算下来要导入半个月了,属于千万级数据,不建议逐行操作。最后发现mysql有LOAD DATA LOCAL INFILE语句可以非常快速的导入txt文档,10G的文档经过测试只用2个小时。本次导入分为三步:
(1)把10G文件切割成100个100M的文件。
(2)把100个txt文件转换成csv文件(要求转码,跟导入没有关系)
(3)把100个txt导入mysql
说明:其中只有1和3是负责导入mysql的,如果不需要转换为csv只看下面的一和三即可。下文的四和五是处理路径问题的类和主函数的类。
一、切割文件
CutText类中的方法splitFile(String filePath, String cutPath, int fileCount);有三个参数filePath代表txt文件路径,cutPath代表切割后放置的文件夹,fileCount表示要切割的份数,自行调整。(我觉得分割后每个文件100M比较合适,导入100M的文件时间还算可以)
CutText源代码:
package text;import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;/*** 生成大文件* @author Administrator**/
public class CutText {final static String TXTPATH = "D:\\TEMP\\test.txt";final static String CUTPATH = "D:\\TEMP\\temp";public static void splitFile(String filePath, String cutPath, int fileCount) throws IOException {System.out.println("Start cutting files");int lastPos = filePath.lastIndexOf("/");String newPath = cutPath + filePath.substring(lastPos,filePath.lastIndexOf("."));new File(cutPath).mkdir();FileInputStream fis = new FileInputStream(filePath);FileChannel inputChannel = fis.getChannel();final long fileSize = inputChannel.size();long average = fileSize / fileCount;//平均值long bufferSize = 200; //缓存块大小,自行调整ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.valueOf(bufferSize + "")); // 申请一个缓存区long startPosition = 0; //子文件开始位置long endPosition = average < bufferSize ? 0 : average - bufferSize;//子文件结束位置for (int i = 0; i < fileCount; i++) {if (i + 1 != fileCount) {int read = inputChannel.read(byteBuffer, endPosition);// 读取数据readW:while (read != -1) {byteBuffer.flip();//切换读模式byte[] array = byteBuffer.array();for (int j = 0; j < array.length; j++) {byte b = array[j];if (b == 10 || b == 13) { //判断\n\rendPosition += j;break readW;}}endPosition += bufferSize;byteBuffer.clear(); //重置缓存块指针read = inputChannel.read(byteBuffer, endPosition);}System.out.println("已切割:" + 100 * (float) i / (float) fileCount + "%");}else{endPosition = fileSize; //最后一个文件直接指向文件末尾}FileOutputStream fos = new FileOutputStream(newPath + "_" + i + filePath.substring(filePath.length()-4));FileChannel outputChannel = fos.getChannel();inputChannel.transferTo(startPosition, endPosition - startPosition, outputChannel);//通道传输文件数据outputChannel.close();fos.close();startPosition = endPosition + 1;endPosition += average;}inputChannel.close();fis.close();System.out.println("已切割:100%");System.out.println("Document cutting completed");}public static void main(String[] args) throws Exception {long startTime = System.currentTimeMillis();splitFile(TXTPATH, CUTPATH, 100);long endTime = System.currentTimeMillis();System.out.println("耗费时间: " + (endTime - startTime) + " ms");}}
二、txt文件转换为csv文件
TxtToCSV类中的方法allFileIO(String filePath, String cutPath, int fileCount)有三个参数filePath代表源txt文件路径仅仅用于获取文件名,cutPath转码文件的位置,fileCount表示要转码的文件数(与上面保持一致)。allFileIO调用largeFileIO转码。另一个方法largeFileIO(String inputFile)是转码函数,参数inputFile是txt位置,转码后的csv与txt放在同一目录。
TxtToCSV源代码:
package text;import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.sql.SQLException;import path.PathString;public class TxtToCSV {//1M 415mspublic static void largeFileIO(String inputFile) throws SQLException {String outputFile = inputFile.substring(0, inputFile.lastIndexOf(".")) + ".csv";//为输出csv做准备FileOutputStream out = null;OutputStreamWriter osw = null;BufferedWriter bw = null;try {//准备输出到csvFile finalCSVFile = new File(outputFile);out = new FileOutputStream(finalCSVFile, true);osw = new OutputStreamWriter(out, "UTF-8");// 手动加上BOM标识osw.write(new String(new byte[] { (byte) 0xEF, (byte) 0xBB, (byte) 0xBF }));bw = new BufferedWriter(osw);//准备读取文件BufferedInputStream bis = new BufferedInputStream(new FileInputStream(new File(inputFile)));BufferedReader in = new BufferedReader(new InputStreamReader(bis, "utf-8"), 10 * 1024 * 1024);// 10M缓存while (in.ready()) {String line = in.readLine(); //逐行读取String[] myDatashuzu = line.split(","); //分割字符串//输出到csvtry {bw.append(myDatashuzu[0] + "," + myDatashuzu[1] + "," + myDatashuzu[2] + "," + myDatashuzu[3] + "," + myDatashuzu[4] + "," + myDatashuzu[5] + "," + myDatashuzu[6] + "," + myDatashuzu[7] + "," + myDatashuzu[8] + "\n");} catch (Exception e) {// TODO: handle exceptioncontinue;}}closeAll(bw,osw,out,in);} catch (IOException ex) {ex.printStackTrace();}}public static void allFileIO(String filePath, String cutPath, int fileCount) throws SQLException {String fileName = PathString.getFileName(PathString.getPath(filePath));String fileKind = PathString.getFileKind(PathString.getPath(filePath));for (int i = 0; i < fileCount; i++) {String txtPath = cutPath + "/" + fileName + "_" + i + "." + fileKind;largeFileIO(txtPath);System.out.println("已转码:" + 100 * (float) i / (float) fileCount + "%");}System.out.println("已转码:100%");}public static void closeAll(BufferedWriter bw, OutputStreamWriter osw, FileOutputStream out, BufferedReader in){if (bw != null) {try {bw.close();bw = null;} catch (IOException e) {e.printStackTrace();}}if (osw != null) {try {osw.close();osw = null;} catch (IOException e) {e.printStackTrace();}}if (out != null) {try {out.close();out = null;} catch (IOException e) {e.printStackTrace();}}if (in != null) {try {in.close();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}} }
}
三、txt文件导入mysql
TxtLoadToMySQL类中的方法
toMySQL(String filePath, String cutPath, int fileCount)
有三个参数filePath代表源txt文件路径,cutPath切割后文件的位置,fileCount表示要转码的文件数(与上面保持一致)。其核心是这句mysql语句
LOAD DATA LOCAL INFILE 'filePath' INTO TABLE table_name FIELDS TERMINATED BY ',' (column_name_one, column_name_two, column_name_three, column_name_four, column_name_five);
这句语句中的filePath就是txt位置如:D:/TEMP/test.txt,table_name是表名,BY ','表示源txt文件中以,作为各列分隔符,column_name_one是列名。
这条命令需要mysql开启本地文件load,导入无效请看最后的说明
TxtLoadToMySQL源代码:
package mysql;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class TxtLoadToMySQL{private static final String DBDRIVER = "com.mysql.jdbc.Driver"; //驱动程序名private static final String DBURL = "jdbc:mysql://localhost:3306/dashuju?useSSL=false"; //URL指向要访问的数据库名mydataprivate static final String DBUSER = "root"; //MySQL配置时的用户名private static final String DBPASSWORD = "123456"; //MySQL配置时的密码public static void toMySQL(String filePath, String cutPath, int fileCount) {System.out.println("Start importing the database");Connection conn = null;PreparedStatement pstmt = null;String sqlStart = "LOAD DATA LOCAL INFILE '";String sqlEnd = "' INTO TABLE ten_g FIELDS TERMINATED BY ',' (zero, one,two,three,four,five,six,seven,eight);";String fileStart = cutPath + filePath.substring(filePath.lastIndexOf("/"),filePath.lastIndexOf(".")) + "_";String fileEnd = filePath.substring(filePath.lastIndexOf("."));try {Class.forName(DBDRIVER); //注册驱动conn = DriverManager.getConnection(DBURL,DBUSER,DBPASSWORD); //获得连接对象for (int i = 0; i < fileCount; i++) {String sql = sqlStart + fileStart + i + fileEnd + sqlEnd;pstmt = conn.prepareStatement(sql);pstmt.execute();System.out.println("已导入:" + 100 * (float) i / (float) fileCount + "%");}pstmt.close();conn.close();System.out.println("已导入:100%");} catch (Exception e) {// TODO: handle exception}System.out.println("Database import completed");}public static void main(String[] args) {String filePath = "D:/TEMP/test.txt";String cutPath = "D:/TEMP/temp";toMySQL(filePath, cutPath, 2);}
}
四、处理路径
getPath把\替换为/,因为LOAD DATA中的路径要使用/或者\\,而转为/可以在LOAD DATA与java中使用,所以转为/比较方便。
getFileName获取文件名如D:/TEMP/test.txt返回test。
getFileKind获取文件类型如D:/TEMP/test.txt返回txt。
package path;public class PathString {public static String getPath(String pathString) {pathString = pathString.replace("\\", "/");return pathString;}public static String getFileName(String pathString) {String nameString = pathString.substring(pathString.lastIndexOf("/") + 1, pathString.lastIndexOf("."));return nameString;}public static String getFileKind(String pathString) {String kindString = pathString.substring(pathString.lastIndexOf(".") + 1);return kindString;}// public static void main(String[] args) {
// System.out.println(getFileKind(getPath("D:\\TEMP\\test3.txt")));
// }
}
五、主函数
每一句做的事情注释比较清楚,不再解释,如果不需要哪一句注释即可。程序实际测试10G文件切割并转码用时30min,导入数据库用时2h,占用储存切割文件+csv=20G,数据库文件25G,加源文件总共占用55G,总用时2.5h。
package main;
import java.io.IOException;
import java.sql.SQLException;import mysql.TxtLoadToMySQL;
import path.PathString;
import text.CutText;
import text.TxtToCSV;public class TxtToMysqlMain {/*** 主函数* 切割10M并插入数据库用时 14281 ms* 切割10M并插入数据库并转码切割文件为csv用时 16580 ms* 切割10M并插入数据库并转码10M文件为csv用时 19625 ms* 10G估计用时为10*1024/10*16580 = 16977920 ms = 16978s = 4.71h* 预计使用储存空间为 X*2.5 + X*3 + X = 6.5*X*/public static void main(String[] args) throws IOException, SQLException {long startTime = System.currentTimeMillis(); //获取程序开始执行时间//格式化路径,filePath源文件路径,cutPath切割文件存放路径(缓存目录),fileCount切割份数String filePath = PathString.getPath("D:\\TEMP\\gps1.txt");String cutPath = PathString.getPath("D:\\TEMP\\temp");int fileCount = 100;CutText.splitFile(filePath, cutPath, fileCount); //执行文件切割TxtToCSV.allFileIO(filePath, cutPath, fileCount); //执行切割后txt转换为csvTxtLoadToMySQL.toMySQL(filePath, cutPath, fileCount); //执行插入数据库// TxtToCSV.largeFileIO(filePath); //源文件(10Gtxt)转换为csvlong endTime2 = System.currentTimeMillis();//获取程序结束执行时间System.out.println("耗费时间: " + (endTime2 - startTime) + " ms");}
}
重要说明:
(1)程序需要mysql开启本地文件load
在mysql里执行
SHOW VARIABLES LIKE 'local_infile';
SET GLOBAL local_infile = 1;
(2)如果mysql报错The server time zone value ‘Öйú±ê׼ʱ¼ä’
点击这里
(3)如果mysql显示
WARN: Establishing SSL connection without server's identity verification is not recommended.
是因为当前版本的MySQL要求使用SSL,给连接mysql的url加参数
useSSL=false即可
jdbc:mysql://localhost:3306/dashuju?useSSL=false;
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
