본문 바로가기
카테고리 없음

schedule하여 정해진 process 실행하는 코드

by 문자메일 2020. 2. 5.

#!/usr/bin/python
#-*- coding: utf-8 -*-

import pandas as pd
import numpy as np
import re
import time
from multiprocessing import Pool
import multiprocessing
import os
import schedule


###############
#MainAction 추출하는 funcition
#
# 제네릭하게 사용할 수 있는 방법 생각해 볼 것
#
def extract_mainAction(_df):
    #https://vincien.tistory.com/16
    #print (globalDF["MAIN_ACT_TP"].unique())
    extract_mainAction_count_Series = _df["MAIN_ACT_TP"].value_counts(normalize=False, sort=True, ascending=False,
                                               bins=None, dropna=True)
    extract_mainAction_percent_Series = _df["MAIN_ACT_TP"].value_counts(normalize=True, sort=True, ascending=False,
                                               bins=None, dropna=True)
    extract_mainAction_percent_Series = extract_mainAction_percent_Series * 100
    #df = pd.DataFrame([extract_mainAction_count_Series, extract_mainAction_percent_Series]) #, index=['a','b','c','d'], columns = ['kor', 'eng'])
    df = pd.concat([extract_mainAction_count_Series, extract_mainAction_percent_Series], axis=1) #merge를 하는데, 'dept' 칼럼을 기준으로 해라는 뜻
    print (df)
    df.to_excel(FOLDER_PATH+"extracted_mainAction"+".xlsx")
###############
###############
def make_NE_tagging_corpous(df):
    ###df = pd.read_excel(FOLDER_PATH+"result_num"+".xlsx", encoding='UTF8')
    ###temp_dataframe = df[["MAIN_ACT_TP", "USR_UTTER", "NAMED_ENT"]]
    #temp_dataframe = globalDF[["MAIN_ACT_TP", "USR_UTTER", "NAMED_ENT"]]
    start_time = time.time()
    temp_dataframe = df
    temp_dataframe["ne"] = 1
    temp_dataframe["ne1"] = 1
    SEARCH_PATTERN1 = re.compile("\[(.+)\]", re.IGNORECASE)
    count = 0
    for idx, text in temp_dataframe.iterrows():
        #print(globalDF["USR_UTTER"][idx])
        try:
            #print(globalDF["USR_UTTER"][idx])
            USR_UTTER = df["USR_UTTER"][idx].lower()
            #print( type(USR_UTTER) )
            #USR_UTTER_SUB = re.sub('[\-]', '', USR_UTTER)
            USR_UTTER_SUB = USR_UTTER.replace('-', ' ')
            ##print(idx)
            if (count % 10000 == 0):
                print ('parent process : ' + str(os.getpid() ) + " df.index : " + str(df.index) + ", 처리 문장 갯수(1만개기준출력) : " + str(count) )
            
            original_text = USR_UTTER_SUB
            tag_original_text = USR_UTTER_SUB
            text = df["NAMED_ENT"][idx].lower()
            #print( type(text) )
            #원문에서 대괄호 제거 정규식( [] )
            text = ''.join(re.findall(SEARCH_PATTERN1, text))
            #NE가 2개 이상일 경우 ,로 split하여 List 생성
            text = text.split(",")
            #print("byb : " + " ".join(text))
            #',' 로 구분된 NE 리스트 반복문
            for i in text:
                #print(i)
                #NE명과 해당 NE Text 추출 하는 정규표현식
                temp1 = ''.join( re.findall("{\"(.+)\":\".+\"}", i) )
                temp2 = ''.join( re.findall("{\".+\":\"(.+)\"}", i) )
                temp1 = re.sub('[\"\']', '', temp1)
                temp2 = re.sub('[\"\']', '', temp2)
                #print(temp1)
                #print(temp2)

                temp2 = temp2.split(':')
                #print(temp2)
                #print("temp2[0] : " + temp2[0])
                if temp2[0] != "":
                    tag_original_text = tag_original_text.replace(temp2[0], "<" + temp2[0] + ":" + temp1 + ">")
                    original_text = original_text.replace(temp2[0], "<" + temp1 + ">")


                #print("original_text : " + original_text)
                #print("tag_original_text : " + tag_original_text)
                #print("----\n\n")
            count = count + 1
        except Exception  as e:
            original_text = temp_dataframe["USR_UTTER"][idx]
            tag_original_text = temp_dataframe["USR_UTTER"][idx]
            print("except")
            print(e)
        temp_dataframe["ne"][idx] = original_text
        temp_dataframe["ne1"][idx] = tag_original_text
     
    return temp_dataframe
    #temp_dataframe.to_excel(FOLDER_PATH+"make_NE_Corpus"+".xlsx")

###############
###############
    
def parallelize_dataframe(df, func):
    df_split = np.array_split(df, numCores)
    pool = Pool(numCores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df
##################
##################
def sort_desc_mostSaidCorpus(_df):
    df = _df
    #df = tmp.drop(columns=['Unnamed: 0'])
    
    print(df.columns)
    corpus_dict={}
    corpus_list=[]
    for idx,s in df.iterrows():
        try:
            ne_corpus = df['MAIN_ACT_TP'][idx] + "-" + df['ne'][idx]
            if ne_corpus in corpus_dict:
                corpus_dict[ne_corpus] += 1
            else:
                corpus_dict[ne_corpus] = 1
        except:
            print("Except : can only concatenate str (not int) to str")
    #print(corpus_dict)
    
    for key in corpus_dict.keys():
        temp = key.split("-")
        temp.append(corpus_dict[key])
        corpus_list.append(temp)
    all_file_NE_comb_output = pd.DataFrame(corpus_list, columns=["MainAction","NE Combination(Output in order)","appear Count"])
    
    unique_mainAction_list = all_file_NE_comb_output['MainAction'].unique()
    outputFrame = pd.DataFrame(columns=("MainAction", "NE Combination(Output in order)", "appear Count"))
    num=0
    for mainAction in unique_mainAction_list:
        newFrame = []
        newFrame = all_file_NE_comb_output[all_file_NE_comb_output['MainAction'] == mainAction]
        newFrame = newFrame.sort_values(by=['appear Count'], ascending=[False])
        ###
        # 상위 95% 발화 패턴 추출하는 부분을 여기에 구현하여야 한다.
        totalAppearCount = newFrame['appear Count'].sum()
        endLoopCount = totalAppearCount * 0.9
        sumAppearCount = 0
        index = 0;
        while sumAppearCount < endLoopCount:
            sumAppearCount = sumAppearCount + newFrame['appear Count'].iloc[index]
            index = index + 1
            print ( "totalAppearCount : " + str(totalAppearCount) + ", sumAppearCount : " + str(sumAppearCount) + ", endLoopCount : " + str(endLoopCount))
        ###
        num = num + 1
        print(newFrame.columns)
        print(index)
        print(str(num) + ". " + mainAction)
        #현제까지 인덱스 슬라이싱이 안되는 이슈
        newFrame = newFrame.reset_index(drop=True)
        newFrame = newFrame[0:index]
        print(newFrame)
        outputFrame = outputFrame.append(newFrame)
        print(outputFrame)
        
    
    #all_file_NE_comb_output = all_file_NE_comb_output.dropna(axis=0, ) # NaN row 삭제
    outputFrame.to_excel(FOLDER_PATH + "all_file_NE_comb_list" + ".xlsx")
    print("Process Finish!!!!")
    #output = pd.DataFrame(corpus_dict)
    #output = output.dropna(axis=0, ) # NaN row 삭제
    #print(output)
    #output.to_excel("D:/EIC/" + "test" + ".xlsx")
##################

##################
def preprocessor(globalDF):
    #
    # 데이터 전처리 부분
    #
    start_time = time.time()
    #불필요한 column의 데이터 삭제 작업 수행
    try:
        globalDF = globalDF.drop(columns=WILL_DELETE_COLUMNS_LIST)
    except:
        print("except")

    #발화 칼럼에 데이터 없는 경우 해당 행 삭제
    globalDF = globalDF.dropna(subset=['USR_UTTER']) #USR_UTTER 있는 NaN row만 삭제. axis default는 row이기에,

    #원하는 국가로 데이터 필터링
    #Pandas Series 문자열 찾는 메서드 설명 잘되어있는 블로그
    #https://emilkwak.github.io/pandas-contains-multiple
    globalDF = globalDF[ globalDF['X_DEVICE_COUNTRY'] == FILTER_VALUE_X_DEVICE_COUNTRY ]
    globalDF = globalDF[ (globalDF['X_DEVICE_PLATFORM'].str.startswith('W18')) | (globalDF['X_DEVICE_PLATFORM'].str.startswith('W19')) ]

    globalDF.to_excel(FOLDER_PATH+"result_num"+".xlsx")
    print(globalDF.index)
    print ("소요 시간(초) : " + str(time.time() - start_time))
    return globalDF



WILL_DELETE_COLUMNS_LIST=["LOG_SEQ", "SVR_REQ_TIME", "SVR_RESP_TIME", "X_DEVICE_ID", 
                          "X_COUNTRY_CP", "X_DEVICE_MODEL", "X_DEVICE_FW_VERSION", 
                          "USR_STAT", "DIALOG_ACT_TP", "DOMN_EXPERT_HIT", "SYS_ACT_TP",
                          "TV_SRCH_QUERY", "HTTP_STAT_CODE", "ERROR_CODE", 
                          "ERROR_MSG", "EXT_TIME", "EXT_NLU_RESP", "AUDIO_SRC", "NLU_ID",
                          "OPT_QUERY", "X_DEVICE_LOGICAL_ID"]
FILTER_VALUE_X_DEVICE_COUNTRY = 'US'
FOLDER_PATH = "/home/yeongbin/code_folder/Log/AIC/"
numCores = 1 #default 1
fileList = ["TB_ADM_TV_API_LOG_01_2020-01-03", "TB_ADM_TV_API_LOG_01_2020-01-04","TB_ADM_TV_API_LOG_01_2020-01-06", "TB_ADM_TV_API_LOG_01_2020-01-07","TB_ADM_TV_API_LOG_01_2020-01-08","TB_ADM_TV_API_LOG_01_2020-01-09","TB_ADM_TV_API_LOG_01_2020-01-10"]
globalDF = None
##################

##################
def dailyRoutineFunction():
    start_time = time.time()
    print ("start time : " + str(start_time) )
    numCores =  multiprocessing.cpu_count()
    print('cpu counts : %d' % numCores)

    globalDF = pd.read_csv(FOLDER_PATH+fileList[0]+".txt", delimiter='\t', encoding='UTF8')
    print(globalDF.index)

#     for i in range(1,7):
#         tempDF = pd.read_csv(FOLDER_PATH+fileList[i]+".txt", delimiter='\t', encoding='UTF8')
#         globalDF = globalDF.append(tempDF, ignore_index=True)
#         print(globalDF.index)
    
    globalDF = preprocessor(globalDF)

    extract_mainAction(globalDF)
    start_time = time.time()
    dataFrame = parallelize_dataframe( globalDF[["MAIN_ACT_TP", "USR_UTTER", "NAMED_ENT"]], make_NE_tagging_corpous )
    dataFrame.to_excel(FOLDER_PATH+"make_NE_Corpus"+".xlsx")
    print ("소요 시간(초) : " + str(time.time() - start_time))
    sort_desc_mostSaidCorpus(dataFrame)
    print("Process Finish!!!")



##################
if __name__=="__main__":
    schedule.every().day.at("15:53").do(dailyRoutineFunction)
    
    while True:
        schedule.run_pending()
        time.sleep(1)
    

댓글