C#如何讀取Txt大數(shù)據(jù)并更新到數(shù)據(jù)庫詳解
環(huán)境
- Sqlserver 2016
- .net 4.5.2
目前測試數(shù)據(jù)1300萬 大約3-4分鐘.(限制一次讀取條數(shù) 和 線程數(shù)是 要節(jié)省服務(wù)器資源,如果調(diào)太大服務(wù)器其它應(yīng)用可能就跑不了了), SqlServerDBHelper為數(shù)據(jù)庫幫助類.沒有什么特別的處理. 配置連接串時記錄把連接池開起來
另外.以下代碼中每次寫都創(chuàng)建了連接 .之前試過一個連接反復(fù)用. 130次大約有20多次 數(shù)據(jù)庫會出問題.并且需要的時間是7-8分鐘 左右.
配置文件: xxx.json
[ {
/*連接字符串 */
"ConnStr": "",
"FilePath": "讀取的文件地址",
/*數(shù)據(jù)庫表名稱 */
"TableName": "寫入的數(shù)據(jù)庫表名",
/*導(dǎo)入前執(zhí)行的語句 */
"ExecBeforeSql": "",
/*導(dǎo)入后執(zhí)行的語句 */
"ExecAfterSql": "",
/*映射關(guān)系 */
"Mapping": [
{
"DBName": "XXX",
"TxtName": "DDD"
}
],
/*過濾數(shù)據(jù)的正則 當(dāng)前只實現(xiàn)了小數(shù)據(jù)一次性讀完的檢查*/
"FilterRegex": [],
/*檢查數(shù)據(jù)合法性(從數(shù)據(jù)庫獲取字段屬性進行驗證) */
"CheckData": false,
/*列分隔符*/
"Separator": "\t",
/*表頭的行數(shù)*/
"HeaderRowsNum": 1
}
]
讀取代碼 : 注意 ConfigurationManager.AppSettings["frpage"] 和 ConfigurationManager.AppSettings["fr"] 需要自己配置好
//讀取配置文件信息
List<dynamic> dt = JsonConvert.DeserializeObject<List<dynamic>>(File.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "config\\ImportTxt.json")));
LogUtil.Info("開始讀取txt數(shù)據(jù),讀取配置:" + dt.Count + "條");
if (dt.Count == 0)
{
return;
}
List<Task> li = new List<Task>();
foreach (dynamic row in dt)
{
LogUtil.Info("開始處理數(shù)據(jù):" + JsonConvert.SerializeObject(row));
li.Add(ProcessRow(row));
}
Task.WaitAll(li.ToArray());
LogUtil.Info("數(shù)據(jù)讀取完畢");
public async Task ProcessRow(dynamic row)
{
await Task.Run(() =>
{
AutoResetEvent AE = new AutoResetEvent(false);
DataTable Data = null;
string error = "", ConnStr, TableName, ExecBeforeSql, ExecAfterSql;
Boolean IsCheck = Convert.ToBoolean(row["CheckData"]);
TableName = Convert.ToString(row.TableName);
ConnStr = Convert.ToString(row.ConnStr);
ExecBeforeSql = Convert.ToString(row.ExecBeforeSql);
ExecAfterSql = Convert.ToString(row.ExecAfterSql);
int HeaderRowsNum = Convert.ToInt32(row.HeaderRowsNum);
string Separator = Convert.ToString(row.Separator);
Dictionary<string, string> dic = new Dictionary<string, string>();
//文件達到多大時就分行讀取
int fr = 0;
if (!int.TryParse(ConfigurationManager.AppSettings["fr"], out fr))
{
fr = 100;
}
fr = fr * 1024 * 1024;
//分行讀取一次讀取多少
int page = 0;
if (!int.TryParse(ConfigurationManager.AppSettings["frpage"], out page))
{
page = 50000;
}
foreach (var dyn in row.Mapping)
{
dic.Add(Convert.ToString(dyn.TxtName), Convert.ToString(dyn.DBName));
}
List<string> regex = new List<string>();
foreach (string item in row["FilterRegex"])
{
regex.Add(item);
}
string fpath = "", cpath = "";
cpath = Convert.ToString(row["FilePath"]);
string rootPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "tmp");
if (!Directory.Exists(rootPath))
{
Directory.CreateDirectory(rootPath);
}
fpath = Path.Combine(rootPath, Path.GetFileName(cpath));
File.Copy(cpath, fpath, true);
LogUtil.Info("拷文件到本地已經(jīng)完成.從本地讀取數(shù)據(jù)操作");
int threadCount = Environment.ProcessorCount * 3;
FileInfo fi = new FileInfo(fpath);
//如果文件大于100M就需要分批讀取.一次50萬條
if (fi.Length > fr)
{
long sumCount = 0;
StreamReader sr = new StreamReader(fi.OpenRead());
int headRow = 0;
string rowstr = "";
List<Thread> li_th = new List<Thread>();
bool last = false;
int ij = 0;
LogUtil.Info("生成StreamReader成功 ");
#region 逐行讀取
while (sr.Peek() > -1)
{
rowstr = sr.ReadLine();
#region 將行數(shù)據(jù)寫入DataTable
if (headRow < HeaderRowsNum)
{
Data = new DataTable();
foreach (string scol in rowstr.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries))
{
Data.Columns.Add(scol.Trim(), typeof(string));
}
headRow++;
continue;
}
else
{ //行數(shù)據(jù)
if (headRow > 1)
{
for (int i = 1; i < headRow && sr.Peek() > -1; i++)
{
rowstr += " " + sr.ReadLine();
}
}
Data.Rows.Add(rowstr.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries));
if (Data.Rows.Count < page && sr.Peek() > -1)
{
continue;
}
}
last = (sr.Peek() == -1);
#endregion
sumCount += Data.Rows.Count;
ProcessPath(Data, page, sr, ref ij, TableName, ExecBeforeSql, ExecAfterSql, dic, IsCheck, li_th);
#region 檢查線程等待
if ((ij > 0 && (ij % threadCount) == 0) || last)
{
LogUtil.Info("完成一批次當(dāng)前共寫數(shù)據(jù): " + sumCount);
while (true)
{
bool isok = true;
foreach (var item in li_th)
{
if (item.IsAlive)
{
isok = false;
Application.DoEvents();
Thread.Sleep(1000);
}
}
if (isok)
{
li_th.Clear();
break;
}
}
//最后一頁要等所有的執(zhí)行完才能執(zhí)行
if (sr.Peek() == -1)
{
WriteTODB(TableName, Data, ExecBeforeSql, ExecAfterSql, dic, false, true);
LogUtil.Info("最后一次寫入完成");
}
LogUtil.Info(" 線程退出開始新的循環(huán)...");
}
Data.Clear();
#endregion
}
sr.Dispose();
#endregion
}
else
{
using (SQLServerDBHelper sdb = new SQLServerDBHelper())
{
sdb.OpenConnection();
#region 一次性讀取處理
Data = LoadDataTableFromTxt(fpath, ref error, Separator, HeaderRowsNum, regex, IsCheck, dic, TableName);
if (IsCheck)
{
DataRow[] rows = Data.Select("ErrorMsg is not null");
if (rows.Length > 0)
{
LogUtil.Info($"讀取{TableName} 數(shù)據(jù)出錯 : {JsonConvert.SerializeObject(rows)}");
return;
}
}
LogUtil.Info($"讀取{TableName} 的txt數(shù)據(jù)完成.共讀取數(shù)據(jù):{Data.Rows.Count}條");
if (Data.Rows.Count == 0 || !string.IsNullOrWhiteSpace(error))
{
if (!string.IsNullOrWhiteSpace(error))
{
LogUtil.Info("讀取數(shù)據(jù)出錯,地址:" + Convert.ToString(row["FilePath"]) + " \r\n 錯誤:" + error);
}
return;
}
sdb.BgeinTransaction();
try
{
WriteTODB(TableName, Data, ExecBeforeSql, ExecAfterSql, dic, sdb: sdb);
sdb.CommitTransaction();
LogUtil.Info(TableName + "數(shù)據(jù)更新完畢 !!");
}
catch (Exception ex)
{
LogUtil.Info(TableName + " 更新數(shù)據(jù)出錯,錯誤:" + ex.Message + " \r\n 堆棧:" + ex.StackTrace);
sdb.RollbackTransaction();
}
#endregion
}
}
GC.Collect();
});
}
private void ProcessPath(DataTable Data, int page, StreamReader sr, ref int ij, string TableName, string ExecBeforeSql, string ExecAfterSql, Dictionary<string, string> dic, bool IsCheck, List<Thread> li_th)
{
int threadCount = Environment.ProcessorCount * 4;
string error = "";
PoolModel p = new PoolModel { TableName = TableName, ExecBeforeSql = ExecBeforeSql, ExecAfterSql = ExecAfterSql, dic = dic };
p.Data = Data.Copy();
if (IsCheck)
{
using (SQLServerDBHelper sdb = new SQLServerDBHelper())
{
error = CheckData(Data, TableName, dic, sdb);
}
DataRow[] rows = Data.Select("ErrorMsg is not null");
if (rows.Length > 0 || !string.IsNullOrWhiteSpace(error))
{
LogUtil.Info($"讀取{TableName} 數(shù)據(jù)出錯 : {JsonConvert.SerializeObject(rows)}\r\n錯誤: " + error);
return;
}
}
ij++;
if (ij == 1)
{
WriteTODB(p.TableName, p.Data, p.ExecBeforeSql, p.ExecAfterSql, p.dic, true, false);
LogUtil.Info("首次寫入完成");
}
else if (sr.Peek() > -1)
{
Thread t = new Thread(d =>
{
PoolModel c = d as PoolModel;
try
{
WriteTODB(c.TableName, c.Data, c.ExecBeforeSql, c.ExecAfterSql, c.dic, false, false);
}
catch (ThreadAbortException)
{
LogUtil.Error("線程退出.................");
}
catch (Exception ex)
{
LogUtil.Error(c.TableName + "寫入數(shù)據(jù)失敗:" + ex.Message + "\r\n堆棧:" + ex.StackTrace + "\r\n 數(shù)據(jù): " + JsonConvert.SerializeObject(c.Data));
ExitApp();
return;
}
});
t.IsBackground = true;
t.Start(p);
li_th.Add(t);
}
}
public void ExitApp()
{
Application.Exit();
}
public void WriteTODB(string TableName, DataTable Data, string ExecBeforeSql, string ExecAfterSql, Dictionary<string, string> dic, bool first = true, bool last = true, SQLServerDBHelper sdb = null)
{
bool have = false;
if (sdb == null)
{
sdb = new SQLServerDBHelper();
have = true;
}
if (first && !string.IsNullOrWhiteSpace(ExecBeforeSql))
{
LogUtil.Info(TableName + "執(zhí)行前Sql :" + ExecBeforeSql);
sdb.ExecuteNonQuery(ExecBeforeSql);
}
sdb.BulkCopy(Data, TableName, dic);
if (last && !string.IsNullOrWhiteSpace(ExecAfterSql))
{
LogUtil.Info(TableName + "執(zhí)行后Sql :" + ExecAfterSql);
sdb.ExecuteNonQuery(ExecAfterSql);
}
LogUtil.Info(TableName + "本次執(zhí)行完成 ");
if (have)
{
sdb.Dispose();
}
}
public string CheckData(DataTable dt, string dbTableName, Dictionary<string, string> dic, SQLServerDBHelper sdb)
{
if (string.IsNullOrWhiteSpace(dbTableName))
{
return "表名不能為空!";
}
if (dic.Count == 0)
{
return "映射關(guān)系數(shù)據(jù)不存在!";
}
List<string> errorMsg = new List<string>();
List<string> Cols = new List<string>();
dic.Foreach(c =>
{
if (!dt.Columns.Contains(c.Key))
{
errorMsg.Add(c.Key);
}
Cols.Add(c.Key);
});
if (errorMsg.Count > 0)
{
return "數(shù)據(jù)列不完整,請與映射表的數(shù)據(jù)列數(shù)量保持一致!列:" + string.Join(",", errorMsg);
}
//如果行數(shù)據(jù)有錯誤信息則添加到這一列的值里
dt.Columns.Add(new DataColumn("ErrorMsg", typeof(string)) { DefaultValue = "" });
string sql = @"--獲取SqlServer中表結(jié)構(gòu)
SELECT syscolumns.name as ColName,systypes.name as DBType,syscolumns.isnullable,
syscolumns.length
FROM syscolumns, systypes
WHERE syscolumns.xusertype = systypes.xusertype
AND syscolumns.id = object_id(@tb) ; ";
DataSet ds = sdb.GetDataSet(sql, new SqlParameter[] { new SqlParameter("@tb", dbTableName) });
EnumerableRowCollection<DataRow> TableDef = ds.Tables[0].AsEnumerable();
// string colName="";
Object obj_val;
//將表結(jié)構(gòu)數(shù)據(jù)重組成字典.
var dic_Def = TableDef.ToDictionary(c => Convert.ToString(c["ColName"]), d =>
{
string DBType = "";
string old = Convert.ToString(d["DBType"]).ToUpper();
DBType = GetCSharpType(old);
return new { ColName = Convert.ToString(d["ColName"]), DBType = DBType, SqlType = old, IsNullble = Convert.ToBoolean(d["isnullable"]), Length = Convert.ToInt32(d["length"]) };
});
DateTime now = DateTime.Now;
foreach (DataRow row in dt.Rows)
{
errorMsg.Clear();
foreach (string colName in Cols)
{
if (dic.ContainsKey(colName))
{
if (!dic_Def.ContainsKey(dic[colName]))
{
return "Excel列名:" + colName + " 映射數(shù)據(jù)表字段:" + dic[colName] + "在當(dāng)前數(shù)據(jù)表中不存在!";
}
//去掉數(shù)據(jù)兩邊的空格
row[colName] = obj_val = Convert.ToString(row[colName]).Trim();
var info = dic_Def[dic[colName]];
//是否是DBNULL
if (obj_val.Equals(DBNull.Value))
{
if (!info.IsNullble)
{
errorMsg.Add("列" + colName + "不能為空!");
}
}
else
{
if (info.DBType == "String")
{
//time類型不用驗證長度(日期的 時間部分如 17:12:30.0000)
if (info.SqlType == "TIME")
{
if (!DateTime.TryParse(now.ToString("yyyy-MM-dd") + " " + obj_val.ToString(), out now))
{
errorMsg.Add("列" + colName + "填寫的數(shù)據(jù)無效應(yīng)為日期的時間部分如:17:30:12");
}
}
else if (Convert.ToString(obj_val).Length > info.Length)
{
errorMsg.Add("列" + colName + "長度超過配置長度:" + info.Length);
}
}
else
{
Type t = Type.GetType("System." + info.DBType);
try
{ //如果數(shù)字中有千分位在這一步可以處理掉重新給這個列賦上正確的數(shù)值
row[colName] = Convert.ChangeType(obj_val, t); ;
}
catch (Exception ex)
{
errorMsg.Add("列" + colName + "填寫的數(shù)據(jù)" + obj_val + "無效應(yīng)為" + info.SqlType + "類型.");
}
}
}
}
}
row["ErrorMsg"] = string.Join(" || ", errorMsg);
}
return "";
}
/// <summary>
/// wm 2018年11月28日13:37
/// 將數(shù)據(jù)庫常用類型轉(zhuǎn)為C# 中的類名(.Net的類型名)
/// </summary>
/// <param name="old"></param>
/// <returns></returns>
private string GetCSharpType(string old)
{
string DBType = "";
switch (old)
{
case "INT":
case "BIGINT":
case "SMALLINT":
DBType = "Int32";
break;
case "DECIMAL":
case "FLOAT":
case "NUMERIC":
DBType = "Decimal";
break;
case "BIT":
DBType = "Boolean";
break;
case "TEXT":
case "CHAR":
case "NCHAR":
case "VARCHAR":
case "NVARCHAR":
case "TIME":
DBType = "String";
break;
case "DATE":
case "DATETIME":
DBType = "DateTime";
break;
default:
throw new Exception("GetCSharpType數(shù)據(jù)類型" + DBType + "無法識別!");
}
return DBType;
}
public class PoolModel
{
public string TableName { get; set; }
public DataTable Data { get; set; }
public string ExecBeforeSql { get; set; }
public string ExecAfterSql { get; set; }
public Dictionary<string, string> dic { get; set; }
}
/// <summary>
/// wm 2018年11月28日13:32
/// 獲取Txt數(shù)據(jù)并對數(shù)據(jù)進行校驗返回一個帶有ErrorMsg列的DataTable,如果數(shù)據(jù)校驗失敗則該字段存放失敗的原因
/// 注意:在使用該方法前需要數(shù)據(jù)表應(yīng)該已經(jīng)存在
/// </summary>
/// <param name="isCheck">是否校驗數(shù)據(jù)合法性(數(shù)據(jù)需要校驗則會按傳入的dbTableName獲取數(shù)據(jù)庫表的結(jié)構(gòu)出來驗證)</param>
/// <param name="map">如果需要驗證數(shù)據(jù)則此處需要傳映射關(guān)系 key Excel列名,Value 數(shù)據(jù)庫列名</param>
/// <param name="dbTableName">驗證數(shù)據(jù)合法性的表(即數(shù)據(jù)會插入到的表)</param>
/// <param name="error">非數(shù)據(jù)驗證上的異常返回</param>
/// <param name="Regexs">用來過濾數(shù)據(jù)的正則</param>
/// <param name="path">讀取文件的路徑</param>
/// <param name="Separator">列分隔符</param>
/// <param name="HeaderRowsNum">表頭的行數(shù)</param>
/// <returns>如果需求驗證則返回一個帶有ErrorMsg列的DataTable,如果數(shù)據(jù)校驗失敗則該字段存放失敗的原因, 不需要驗證則數(shù)據(jù)讀取后直接返回DataTable</returns>
public DataTable LoadDataTableFromTxt(string path, ref string error, string Separator, int HeaderRowsNum, List<string> Regexs = null, bool isCheck = false, Dictionary<string, string> map = null, string dbTableName = "", SQLServerDBHelper sdb = null)
{
DataTable dt = new DataTable();
error = "";
if (isCheck && (map == null || map.Count == 0 || string.IsNullOrWhiteSpace(dbTableName)))
{
error = "參數(shù)標明需要對表格數(shù)據(jù)進行校驗,但沒有指定映射表集合或數(shù)據(jù)表名.";
return dt;
}
string txts = File.ReadAllText(path);
#region 把讀出來的方便數(shù)據(jù)轉(zhuǎn)成DataTable
Regexs?.ForEach(c =>
{
txts = new Regex(c).Replace(txts, "");
});
////替換掉多表的正則
//Regex mu_re = new Regex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+"); //FTP new Regex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+"); //原來以-分隔的 new Regex(@"-{5,}(\s)+-{5,}\s+\|.+(\s)?\|.+(\s)?\|-{5,}");
////去掉所有橫線
//Regex mu_r = new Regex(@"[+-]{4,}"); //FTP new Regex(@"[+-]{4,}"); //原 new Regex(@"(\|-{5,})|(-{5,})");
//string s1 = mu_re.Replace(txts, "");
//string s2 = mu_r.Replace(s1, "");
// string[] tts = s2.Split(new string[] { "\r\n" }, StringSplitOptions.None);
string[] tts = txts.Split(new string[] { "\r\n" }, StringSplitOptions.None);
string[] vals;
string s1;
//生成表頭默認第一行時表頭直到遇到第一個只有一個|的內(nèi)容為止(有幾行表頭,下面的內(nèi)容就會有幾行)
int headerNum = -1;//記錄表頭有幾列
DataRow dr;
//處理col重復(fù)的問題,如果有重復(fù)按第幾個來命名 比如 A1 A2
Dictionary<string, int> col_Rep = new Dictionary<string, int>();
string colName = "";
bool isre = false;//記錄當(dāng)前是否有重復(fù)列
int empty_HeaderRow = 0;
for (int i = 0; i < tts.Length; i++)
{
s1 = tts[i];
//還未獲取出表頭
if (headerNum < HeaderRowsNum)
{
vals = s1.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries);
foreach (string col in vals)
{
colName = col.Trim();
if (col_Rep.Keys.Contains(colName))
{
col_Rep[colName]++;
isre = true;
//重復(fù)列處理
//colName += col_Rep[colName];
continue;
}
else
{
col_Rep.Add(colName, 1);
}
dt.Columns.Add(colName, typeof(string));
}
headerNum = (i == (HeaderRowsNum - 1)) ? HeaderRowsNum : 0;
}
else
{
if (string.IsNullOrWhiteSpace(s1.Trim()) || string.IsNullOrWhiteSpace(s1.Replace(Separator, "")))
{
continue;
}
if (isre)
{
error = "列:" + string.Join(",", col_Rep.Where(c => c.Value > 1).Select(c => c.Key)) + "存在重復(fù)";
return dt;
}
//多行時把多行的數(shù)據(jù)加在一起處理
if (headerNum > 1)
{
for (int j = 1; j < headerNum && (i + j) < tts.Length; j++)
{
//數(shù)據(jù)第一行最后沒有| 如果沒數(shù)據(jù)則直接換行了所以這里補一個空格防止數(shù)據(jù)被當(dāng)空數(shù)據(jù)移除了
s1 += " " + tts[i + j];
}
}
vals = s1.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries);
dr = dt.NewRow();
dr.ItemArray = vals;
dt.Rows.Add(dr);
//因為本次循環(huán)結(jié)束上面會去++ 所以這里只加headerNum-1次
i += (headerNum - 1);
}
}
#endregion
if (isCheck)
{
//dt.Columns.Remove("Item");
//dt.Columns["Item1"].ColumnName = "Item";
//dt.Columns.RemoveAt(dt.Columns.Count - 2);
error = CheckData(dt, dbTableName, map, sdb);
}
return dt;
}
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,謝謝大家對腳本之家的支持。
相關(guān)文章
C#環(huán)形緩沖區(qū)(隊列)完全實現(xiàn)
這篇文章主要為大家詳細介紹了C#環(huán)形緩沖區(qū)(隊列)完全實現(xiàn)代碼,感興趣的小伙伴們可以參考一下2016-07-07
C#實現(xiàn)從網(wǎng)絡(luò)同步標準北京時間的方法
這篇文章主要介紹了C#實現(xiàn)從網(wǎng)絡(luò)同步標準北京時間的方法,涉及C#操作時間的技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-03-03
使用C#調(diào)用系統(tǒng)API實現(xiàn)內(nèi)存注入的代碼
使用C#調(diào)用系統(tǒng)API實現(xiàn)內(nèi)存注入的代碼,學(xué)習(xí)c#的朋友可以參考下。2011-06-06

