data_process.py文件
DLRM数据预处理文件data_process.py内容如下。
from __future__ import absolute_import, division, print_function, unicode_literals import os.path from io import StringIO from os import path import os import time import numpy as np from multiprocessing import Pool,shared_memory import warnings warnings.filterwarnings('ignore') #进程-显示错误 def showerror(err): print("show error:",err) #结合共享内存,多进程同时统计每一列的分类 #把每一列的字符串映射成唯一标识的整数 def speedup_col(out_name,in_name,shape,col,d_path,split): pid = os.getpid() os.sched_setaffinity(pid, [col]) ssm = shared_memory.SharedMemory(name=out_name) out = np.ndarray(shape=shape,dtype="i4",buffer=ssm.buf) ssm_in = shared_memory.SharedMemory(name=in_name) mat = np.ndarray(shape=shape,dtype="U8",buffer=ssm_in.buf) convertDict = {} count = 0 for i in range(mat.shape[0]): # add to convertDict and increment count if mat[i,col] not in convertDict: convertDict[mat[i,col]] = count count += 1 out[i,col] = convertDict[mat[i,col]] return (count,col) def processKaggleCriteoAdData(split, d_path,o_filename,datas): #拼接拆分后的数据 for i, data in enumerate(datas): if i == 0: X_cat = data["X_cat"] X_int = data["X_int"] y = data["y"] else: X_cat = np.concatenate((X_cat, data["X_cat"])) X_int = np.concatenate((X_int, data["X_int"])) y = np.concatenate((y, data["y"])) shape =X_cat.shape counts = [0 for _ in range(shape[1])] pool = Pool(shape[1]) results = [] out_X_cat = np.zeros(shape=shape,dtype='i4') out_name = f"out_X_cat" in_name = f"in_X_cat" X_cat_bytes = X_cat.nbytes # 创建共享内存 ssm = shared_memory.SharedMemory(name=out_name,create=True, size=out_X_cat.nbytes) ssm_in = shared_memory.SharedMemory(name=in_name,create=True, size=X_cat.nbytes) del out_X_cat new_X_cat = np.ndarray(shape=shape,dtype="U8",buffer=ssm_in.buf) new_X_cat[:] = X_cat del X_cat X_cat = new_X_cat #开启进程池,每个进程处理一列数据 for j in range(shape[1]): res = pool.apply_async(speedup_col,args=(out_name,in_name,shape,j,d_path,split,),error_callback=showerror) results.append(res) pool.close() pool.join() for res in results: count, col= res.get() counts[col] = count X_int[X_int < 0] = 0 X_cat = np.ndarray(shape=shape,dtype="i4",buffer=ssm.buf) np.savez( str(d_path) + str(o_filename) + ".npz", X_cat=X_cat, X_int=X_int, y=y, counts=counts, ) print("counts: ",counts) print("X_cat shape: ",X_cat.shape) print("X_int shape: ",X_int.shape) print("Step 2 done !!!") return (ssm,ssm_in) #每个进程处理n行数据,将稀疏、稠密特征、标签拆分并保存到对应的numpy数组 def speedup(i,info,num_data_in_split): type = np.dtype( [("label", ("i4", 1)), ("int_feature", ("i4", 13)), ("cat_feature", ("U8", 26))] ) pid = os.getpid() os.sched_setaffinity(pid, [i]) one = shared_memory.SharedMemory(name="one") two = shared_memory.SharedMemory(name="two") three = shared_memory.SharedMemory(name="three") y = np.ndarray(shape=(num_data_in_split),dtype="i4",buffer=one.buf) X_int = np.ndarray(shape=(num_data_in_split, 13),dtype="i4",buffer=two.buf) X_cat = np.ndarray(shape=(num_data_in_split, 26),dtype="U8",buffer=three.buf) for cut in info: line,index = cut data = np.genfromtxt(StringIO(line), dtype=type, delimiter="\t") y[index] = data["label"] X_int[index] = data["int_feature"] X_cat[index] = data["cat_feature"] def getKaggleCriteoAdData(datafile="", o_filename=""): d_path = "./kaggle_data/" # determine if intermediate data path exists if path.isdir(str(d_path)): print("Saving intermediate data files at %s" % (d_path)) else: os.mkdir(str(d_path)) print("Created %s for storing intermediate data files" % (d_path)) # determine if data file exists (train.txt) if path.exists(str(datafile)): print("Reading data from path=%s" % (str(datafile))) else: print( "Path of Kaggle Display Ad Challenge Dataset is invalid; please download from https://labs.criteo.com/2014/09/kaggle-contest-dataset-now-available-academic-use/" ) exit(0) # count number of datapoints in training set total_count = 0 with open(str(datafile)) as f: for _ in f: total_count += 1 print("Total number of datapoints:", total_count) # determine length of split over 7 days split = 1 num_data_per_split, extras = divmod(total_count, 7) # initialize data to store if extras > 0: num_data_in_split = num_data_per_split + 1 extras -= 1 y = np.zeros(num_data_in_split, dtype="i4") X_int = np.zeros((num_data_in_split, 13), dtype="i4") X_cat = np.zeros((num_data_in_split, 26), dtype="U8") cpus = os.cpu_count() y_size = y.nbytes X_int_size = X_int.nbytes X_cat_size = X_cat.nbytes one = shared_memory.SharedMemory(name="one",create=True, size=y_size) two = shared_memory.SharedMemory(name="two",create=True, size=X_int_size) three = shared_memory.SharedMemory(name="three",create=True, size=X_cat_size) pool =Pool(cpus) count = 0 process_id = 0 batch_size = 10000 info = [] datas = [] if split == 1: # load training data start = time.time() with open(str(datafile)) as f: for i, line in enumerate(f): # store day"s worth of data and reinitialize data if i == (count + num_data_in_split): pool.close() pool.join() y = np.ndarray(shape=(num_data_in_split),dtype="i4",buffer=one.buf) X_int = np.ndarray(shape=(num_data_in_split, 13),dtype="i4",buffer=two.buf) X_cat = np.ndarray(shape=(num_data_in_split, 26),dtype="U8",buffer=three.buf) datas.append({ 'X_int':X_int.copy(), 'X_cat':X_cat.copy(), 'y':y.copy() }) print("\nSaved day_{0} in datas!".format(split)) split += 1 count += num_data_in_split if extras > 0: num_data_in_split = num_data_per_split + 1 extras -= 1 pool =Pool(cpus) index = i - count if ((index+1)==num_data_in_split) or ((index%batch_size)==0): pool.apply_async(speedup,args=(process_id%cpus,info,num_data_in_split),error_callback=showerror) info = [] info.append((line,index)) process_id +=1 else: info.append((line,index)) pool.apply_async(speedup,args=(process_id%cpus,info,num_data_in_split),error_callback=showerror) pool.close() pool.join() y = np.ndarray(shape=(num_data_in_split),dtype="i4",buffer=one.buf) X_int = np.ndarray(shape=(num_data_in_split, 13),dtype="i4",buffer=two.buf) X_cat = np.ndarray(shape=(num_data_in_split, 26),dtype="U8",buffer=three.buf) datas.append({ 'X_int':X_int.copy(), 'X_cat':X_cat.copy(), 'y':y.copy() }) print("\nSaved day_{0} in datas!".format(split)) ssms = [one,three,two] for ssm in ssms: ssm.close() ssm.unlink() print(f"spend {int(time.time()-start)}s in first step!") else: print("Using existing %skaggle_day_*.npz files" % str(d_path)) print("Step 1 done !!!") return datas import argparse parser = argparse.ArgumentParser() parser.add_argument('--file',default='train.txt',choices=['split.txt','train.txt'],) args = parser.parse_args() split = 7 d_path = "./kaggle_data/" o_filename = "kaggle_processed" start = time.time() try: datas = getKaggleCriteoAdData(args.file,o_filename) ssms = processKaggleCriteoAdData(split, d_path,o_filename,datas) for ssm in ssms: ssm.close() ssm.unlink() except Exception as e: print("there are something wrong:", e) print(f"All done, spend time: {int(time.time()-start) }s")
父主题: 更多资源