ScoreWalker/scorewalker-utils/KMeans/Tokenizer.py
2025-03-13 00:13:53 -06:00

184 lines
7.1 KiB
Python

# We do all our imports at the top of our program.
import argparse
import json
import os
import collections
import ConsoleUtils
# Give the program a name.
program_name = 'Tokenizer'
# Describe what the program does beiefly.
program_description = 'Creates the custom tokenized files for running a distance calulation on.'
# The argument parser for the program.
parser = argparse.ArgumentParser(prog=program_name, description=program_description, add_help=False)
# Error and Warning console values:
red_error = '\033[91mError:\033[0m'
yellow_warning = '\033[93mWARNING:\033[0m'
blue_okay = '\033[94mOK\033[0m'
program_header = format('\033[95m%s\033[0m\n'
'-----------------------' % program_name)
# Default decision message.
decision_message = ' Is this okay? (Y/N): '
# Constants
dictionary_tag = 'dictionary'
dict_count_tag = 'tokenCounts'
dict_mapping_tag = 'mapping'
tkn_mapped_tag = 'mappedVal'
file_name_tag = 'name'
file_path_tag = 'path'
file_txt_tag = 'text'
# Loads the un mapped tokens from all the token files in tkn_files (makes sure a file exists and is a .tkn file
# before trying to read it)
def load_tokens_from_files(tkn_files):
print('{:^70}'.format('Loading un-mapped tokens from files...'))
result = {}
tokenized_count = 0
total_to_tokenize = len(tkn_files)
for file in tkn_files:
tokenized_count += 1
if os.path.exists(file):
ConsoleUtils.print_progress_bar(tokenized_count, total_to_tokenize, 50, 70)
with open(file, 'r+') as reader:
file_txt = reader.read()
tkns = file_txt.split(' ')
result[file] = tkns
return result
# Gets the mapped tokens for all the files in tkn_files based on the mapping in tkn_maps.
def get_tkns_for_files(tkn_files, tkn_maps):
print('{:^70}'.format('Mapping tokens from files...'))
num_ran = 0
num_total = len(tkn_files)
result = {}
for path, token_file in tkn_files.items():
num_ran += 1
save_file = path.replace('.tkn', '.lev')
ConsoleUtils.print_progress_bar(num_ran, num_total, 50, 70)
result[save_file] = generate_tokens(token_file, tkn_maps)
return result
# Generates the mapped token values and counts into a nice string to write to a file. It is important to maintain order
# since part of our calculation is to order the tokens by a rule before calculating the distance.
def generate_tokens(token_file, tkn_maps):
# We really want to maintain the original order of tokens!
tkn_count = collections.OrderedDict()
for word in token_file:
if word not in tkn_count:
tkn_count[word] = 1
else:
tkn_count[word] += 1
result_tkns = ''
result_cnts = ''
# We want the key because we need the map as well as the count...
for key, value in tkn_count.items():
result_tkns += format('%s ' % tkn_maps[key])
result_cnts += format('%d ' % value)
result_str = format('%s\n%s' % (result_tkns, result_cnts))
return result_str
# Loads the dictionary of token mappings, weights, and values into a list of dictionaries.
def load_dict(dict_path):
with open(dict_path) as dict_file:
dict_json = json.load(dict_file)
tokens_count = dict_json[dict_count_tag]
tokens_map = dict_json[dict_mapping_tag]
return tokens_count, tokens_map
# Loads the tokens files from the file containing a list of token files which are \n delimited.
def load_tkn_files(file):
with open(file, 'r+') as tkn_file:
result = tkn_file.readlines()
for i in range(len(result)):
result[i] = result[i].replace('\n', '')
result[i] = result[i].replace('Z:\\', '\\\\mount_eng\\eng\\')
return result
# This is the main function of the program.
def main(dict_file, tkn_file, auto_overwrite):
# Load the information from the dictionary (total token counts and token mapping)
tkn_count, tkn_map = load_dict(dict_file)
tkn_file_list = load_tkn_files(tkn_file)
# Load the token files from the file with the list of token files
tkn_files = load_tokens_from_files(tkn_file_list)
# Get all the tokens mapped and get their counts to save as output.
output_files = get_tkns_for_files(tkn_files, tkn_map)
# Write all the output files out!
print('{:^70}'.format('Saving files...'))
num_files_written = 0
num_files_to_write = len(output_files)
for path, text in output_files.items():
num_files_written += 1
with open(path, 'w+') as output:
# If the file exists, and we aren't overwriting files
if os.path.exists(path) and not auto_overwrite:
# Ask for permission
print('\r%s File exists, it will be overwritten.' % yellow_warning)
yes_or_no(decision_message)
# We don't need anything else because if the user says 'no' the program will exit.
output.write(text)
output.close()
ConsoleUtils.print_progress_bar(num_files_written, num_files_to_write, 50, 70)
# Checks the arguments to make sure they are valid.
def check_args(dictionary, tkn_file):
fatal_errors = False
if not os.path.exists(dictionary):
print('%s The passed dictionary file does not exist (%s)' % (red_error, dictionary))
fatal_errors = True
if not os.path.exists(tkn_file):
print('%s The passed token file does not exist (%s)' % (red_error, tkn_file))
fatal_errors = True
if fatal_errors:
parser.print_help()
print('Exiting...')
exit(-1)
# Will ask the user to input yes or no and if they input yes the program will continue to execute. If however, they
# input no, the program will exit with status 0. 0 status is used here because there was no error, the user just chose
# to exit rather than continue executing.
def yes_or_no(message):
decision = input(message)
if decision.lower() == 'y' or decision.lower() == 'yes':
return
elif decision.lower() == 'n' or decision.lower() == 'no':
exit(0)
else:
yes_or_no(' Invalid input, enter Y(es) or N(o): ')
# This is where we call the main method from.
if __name__ == '__main__':
print(program_header)
required_args = parser.add_argument_group('Required')
optional_args = parser.add_argument_group('Optional')
required_args.add_argument('-d', '--dictionary', required=True,
help='The json dictionary file containing the token counts and mappings.')
required_args.add_argument('-f', '--tkn_file', required=True,
help='The txt file containing a list of tkn files in the library.')
optional_args.add_argument('-w', '--overwrite', required=False, action='store_true',
help='If this flag is set, old output of this tool will be automatically overwritten.')
optional_args.add_argument('-h', '--help', action='help', help='Prints the help message.')
args = parser.parse_args()
# Get the args!
lib_dict = args.dictionary
token_file = args.tkn_file
auto_overwrite = args.overwrite
# Are the args valid?
check_args(lib_dict, token_file)
# Now we can run!
main(lib_dict, token_file, auto_overwrite)