Rate This Document
Findability
Accuracy
Completeness
Readability

data_process.py File

The DLRM data preprocessing file data_process.py has the following content:

from __future__ import absolute_import, division, print_function, unicode_literals 
import os.path 
from io import StringIO 
from os import path 
import os 
import time 
import numpy as np 
from  multiprocessing import Pool,shared_memory 
import warnings 
warnings.filterwarnings('ignore') 
 
# Show process errors. 
def showerror(err): 
    print("show error:",err) 
 
# Based on the shared memory, multiple processes collect statistics on the classification of each column at the same time. 
# Map the character string in each column as a unique identifier integer. 
def speedup_col(out_name,in_name,shape,col,d_path,split): 
    pid = os.getpid() 
    os.sched_setaffinity(pid, [col]) 
    ssm = shared_memory.SharedMemory(name=out_name) 
    out = np.ndarray(shape=shape,dtype="i4",buffer=ssm.buf) 
     
    ssm_in = shared_memory.SharedMemory(name=in_name) 
    mat = np.ndarray(shape=shape,dtype="U8",buffer=ssm_in.buf) 
    convertDict = {} 
    count = 0 
    for i in range(mat.shape[0]): 
        # add to convertDict and increment count 
        if mat[i,col] not in convertDict: 
            convertDict[mat[i,col]] = count 
            count += 1 
        out[i,col] = convertDict[mat[i,col]] 
     
    return (count,col) 
def processKaggleCriteoAdData(split, d_path,o_filename,datas): 
    # Combine the split data. 
    for i, data in enumerate(datas): 
        if i == 0: 
            X_cat = data["X_cat"] 
            X_int = data["X_int"] 
            y = data["y"] 
        else: 
            X_cat = np.concatenate((X_cat, data["X_cat"])) 
            X_int = np.concatenate((X_int, data["X_int"])) 
            y = np.concatenate((y, data["y"])) 
    shape =X_cat.shape 
    counts = [0 for _ in range(shape[1])] 
    pool = Pool(shape[1]) 
    results = [] 
    out_X_cat = np.zeros(shape=shape,dtype='i4') 
    out_name = f"out_X_cat" 
    in_name = f"in_X_cat" 
    X_cat_bytes = X_cat.nbytes 
    # Create the shared memory. 
    ssm = shared_memory.SharedMemory(name=out_name,create=True, size=out_X_cat.nbytes) 
    ssm_in = shared_memory.SharedMemory(name=in_name,create=True, size=X_cat.nbytes) 
    del out_X_cat 
    new_X_cat = np.ndarray(shape=shape,dtype="U8",buffer=ssm_in.buf) 
    new_X_cat[:] = X_cat 
    del X_cat 
    X_cat = new_X_cat 
    # Enable the process pool. Each process processes a column of data. 
    for j in range(shape[1]): 
        res = pool.apply_async(speedup_col,args=(out_name,in_name,shape,j,d_path,split,),error_callback=showerror) 
        results.append(res) 
    pool.close() 
    pool.join() 
    for res in results: 
        count, col= res.get() 
        counts[col] = count 
    X_int[X_int < 0] = 0 
    X_cat = np.ndarray(shape=shape,dtype="i4",buffer=ssm.buf) 
    np.savez( 
        str(d_path) + str(o_filename) + ".npz", 
        X_cat=X_cat, 
        X_int=X_int, 
        y=y, 
        counts=counts, 
    ) 
     
    print("counts: ",counts) 
    print("X_cat shape: ",X_cat.shape) 
    print("X_int shape: ",X_int.shape) 
    print("Step 2 done !!!") 
    return (ssm,ssm_in) 
 
# Each process processes n rows of data, splits sparse features, dense features, and tags, and saves them to the corresponding NumPy arrays. 
def speedup(i,info,num_data_in_split): 
    type = np.dtype( 
        [("label", ("i4", 1)), ("int_feature", ("i4", 13)), ("cat_feature", ("U8", 26))] 
    ) 
    pid = os.getpid() 
    os.sched_setaffinity(pid, [i]) 
    one = shared_memory.SharedMemory(name="one") 
    two = shared_memory.SharedMemory(name="two") 
    three = shared_memory.SharedMemory(name="three") 
    y = np.ndarray(shape=(num_data_in_split),dtype="i4",buffer=one.buf) 
    X_int = np.ndarray(shape=(num_data_in_split, 13),dtype="i4",buffer=two.buf) 
    X_cat = np.ndarray(shape=(num_data_in_split, 26),dtype="U8",buffer=three.buf) 
    for cut in  info: 
        line,index = cut 
        data = np.genfromtxt(StringIO(line), dtype=type, delimiter="\t") 
        y[index] = data["label"] 
        X_int[index] = data["int_feature"] 
        X_cat[index] = data["cat_feature"] 
def getKaggleCriteoAdData(datafile="", o_filename=""): 
    d_path = "./kaggle_data/" 
    # determine if intermediate data path exists 
    if path.isdir(str(d_path)): 
        print("Saving intermediate data files at %s" % (d_path)) 
    else: 
        os.mkdir(str(d_path)) 
        print("Created %s for storing intermediate data files" % (d_path)) 
    # determine if data file exists (train.txt) 
    if path.exists(str(datafile)): 
        print("Reading data from path=%s" % (str(datafile))) 
    else: 
        print( 
            "Path of Kaggle Display Ad Challenge Dataset is invalid; please download from https://labs.criteo.com/2014/09/kaggle-contest-dataset-now-available-academic-use/" 
        ) 
        exit(0) 
    # count number of datapoints in training set 
    total_count = 0 
    with open(str(datafile)) as f: 
        for _ in f: 
            total_count += 1 
    print("Total number of datapoints:", total_count) 
    # determine length of split over 7 days 
    split = 1 
    num_data_per_split, extras = divmod(total_count, 7) 
    # initialize data to store 
    if extras > 0: 
        num_data_in_split = num_data_per_split + 1 
        extras -= 1 
    y = np.zeros(num_data_in_split, dtype="i4") 
    X_int = np.zeros((num_data_in_split, 13), dtype="i4") 
    X_cat = np.zeros((num_data_in_split, 26), dtype="U8") 
     
    cpus = os.cpu_count() 
    y_size = y.nbytes 
    X_int_size = X_int.nbytes 
    X_cat_size = X_cat.nbytes 
    one = shared_memory.SharedMemory(name="one",create=True, size=y_size) 
    two = shared_memory.SharedMemory(name="two",create=True, size=X_int_size) 
    three = shared_memory.SharedMemory(name="three",create=True, size=X_cat_size) 
    pool =Pool(cpus) 
    count = 0 
    process_id = 0 
    batch_size = 10000 
    info = [] 
    datas = [] 
    if split == 1: 
        # load training data 
        start = time.time() 
        with open(str(datafile)) as f: 
             
            for i, line in enumerate(f): 
                # store day"s worth of data and reinitialize data 
                if i == (count + num_data_in_split): 
                    pool.close() 
                    pool.join() 
                    y = np.ndarray(shape=(num_data_in_split),dtype="i4",buffer=one.buf) 
                    X_int = np.ndarray(shape=(num_data_in_split, 13),dtype="i4",buffer=two.buf) 
                    X_cat = np.ndarray(shape=(num_data_in_split, 26),dtype="U8",buffer=three.buf) 
                    datas.append({ 
                        'X_int':X_int.copy(), 
                        'X_cat':X_cat.copy(), 
                        'y':y.copy() 
                    }) 
                    print("\nSaved day_{0} in datas!".format(split)) 
                    split += 1 
                    count += num_data_in_split 
                    if extras > 0: 
                        num_data_in_split = num_data_per_split + 1 
                        extras -= 1 
                    pool =Pool(cpus) 
                index = i - count 
                if  ((index+1)==num_data_in_split) or ((index%batch_size)==0): 
                    pool.apply_async(speedup,args=(process_id%cpus,info,num_data_in_split),error_callback=showerror) 
                    info = [] 
                    info.append((line,index)) 
                    process_id +=1 
                else: 
                    info.append((line,index)) 
        pool.apply_async(speedup,args=(process_id%cpus,info,num_data_in_split),error_callback=showerror) 
        pool.close() 
        pool.join() 
        y = np.ndarray(shape=(num_data_in_split),dtype="i4",buffer=one.buf) 
        X_int = np.ndarray(shape=(num_data_in_split, 13),dtype="i4",buffer=two.buf) 
        X_cat = np.ndarray(shape=(num_data_in_split, 26),dtype="U8",buffer=three.buf) 
        datas.append({ 
                'X_int':X_int.copy(), 
                'X_cat':X_cat.copy(), 
                'y':y.copy() 
        }) 
        print("\nSaved day_{0} in datas!".format(split)) 
        ssms = [one,three,two] 
        for ssm in ssms: 
            ssm.close() 
            ssm.unlink() 
        print(f"spend {int(time.time()-start)}s in first step!") 
         
    else: 
        print("Using existing %skaggle_day_*.npz files" % str(d_path)) 
    print("Step 1 done !!!") 
    return datas 
import argparse 
parser = argparse.ArgumentParser() 
parser.add_argument('--file',default='train.txt',choices=['split.txt','train.txt'],) 
args = parser.parse_args() 
split = 7 
d_path = "./kaggle_data/" 
o_filename = "kaggle_processed" 
start = time.time() 
try: 
    datas = getKaggleCriteoAdData(args.file,o_filename) 
    ssms = processKaggleCriteoAdData(split, d_path,o_filename,datas) 
    for ssm in ssms: 
        ssm.close() 
        ssm.unlink() 
     
except Exception as e: 
    print("there are something wrong:", e) 
     
print(f"All done, spend time: {int(time.time()-start) }s")