ScoreWalker/scorewalker-utils/KMeans/L3Centers.py
2025-03-13 00:13:53 -06:00

316 lines
9.3 KiB
Python

# We do all our imports at the top of our program.
import argparse
import os
import sys
import DoctypeCenter
# Give the program a name.
program_name = 'Program name'
# Describe what the program does beiefly.
program_description = 'Brief description.'
# The argument parser for the program.
parser = argparse.ArgumentParser(prog=program_name, description=program_description, add_help=False)
# This is where optional arguments are put (They are optional so they have a default value assign it here).
# Doing it this way keeps the code more readable since all default values are in one place, and easy to change without
# needing to search through our script for them.
global_opt = 'default value'
# Error and Warning console values:
red_error = '\033[91mError:\033[0m'
yellow_warning = '\033[93mWARNING:\033[0m'
blue_okay = '\033[94mOK\033[0m'
program_header = format('\033[95m%s\033[0m\n'
'-----------------------' % program_name)
# Default decision message.
decision_message = ' Is this okay? (Y/N): '
AVG_PERCENT_MATCH = 0.0
TOTAL_NUM_DOCS = 0
def clean_doc_file_name(full_file_name):
result_file_name = full_file_name.replace(r'e:\tmp\daveg-xfer', r'C:\Users\chris\Documents\Code\Tests\KMeans')
return result_file_name
def load_map_file(map_file):
result = {}
ids_to_files = {}
with open(map_file) as reader:
lines = reader.readlines()
for line in lines:
line = line.replace('\n', '')
split_idx = line.rfind(':')
n_line = line[:split_idx]
doc_name = clean_doc_file_name(n_line)
doc_id = int(line[split_idx + 1:])
result[doc_name] = doc_id
ids_to_files[doc_id] = doc_name
return result, ids_to_files
def load_docs(file_name, result, id_mapping, name_mapping):
with open(file_name) as reader:
lines = reader.readlines()
current_doctype = ''
current_file_name = ''
current_doc_id = 0
current_file_scores = {}
num_ran = 0
num_skipped = 0
num_scores = 0
for line in lines:
num_ran += 1
if line is None or line.isspace():
continue
if line.startswith('#'):
result[current_file_name] = {'scores': convert_scores(current_file_scores), 'doctype': current_doctype}
current_file_name = clean_doc_file_name(line[1:-1])
current_file_scores = {}
current_doctype = get_doctype_from_name(current_file_name)
if current_file_name in name_mapping:
current_doc_id = name_mapping[current_file_name]
elif line.startswith('*'):
line_data = line[1:-1].split(':')
if len(line_data) != 2:
continue
try:
doc_id = int(line_data[0])
tmp_doctype = get_doctype_from_id(doc_id, id_mapping)
tmp_name = id_mapping[doc_id]
if tmp_doctype == current_doctype:
score = float(line_data[1])
current_file_scores[tmp_name] = score
num_scores += 1
else:
num_skipped += 1
except:
continue
if current_file_name in name_mapping:
result[current_doc_id] = {'scores': current_file_scores, 'doctype': current_doctype}
print('Skipped %d scores and saved %d scores' % (num_skipped, num_scores))
return result
def load_unmapped_docs(dist_files, map_ids, map_names):
result = {}
for dist_file in dist_files:
print('Loading file %s' % dist_file)
result = load_docs(dist_file, result, map_ids, map_names)
return result
def get_files_from_dir(directory):
dist_files = []
map_file = ''
for file in os.listdir(directory):
if file.endswith('.dist'):
dist_files.append(os.path.join(directory, file))
elif file.endswith('Maps.txt'):
map_file = os.path.join(directory, file)
return map_file, dist_files
def get_doctype_from_name(file):
split_idx = file.rfind('\\')
result = file[:split_idx]
end_idx = result.rfind('\\')
result = result[end_idx+1:]
return result
def get_doctype_from_id(docId, id_map):
key = id_map[docId]
return get_doctype_from_name(key)
def distance_from_score(score):
return float((10 - score) + float(1))
def convert_scores(scores_dict):
for doc_id, score in scores_dict.items():
scores_dict[doc_id] = distance_from_score(score)
return scores_dict
def get_doctype_coverage(cluster_size, matches_len):
total_docs = float(cluster_size)
num_matches = float(matches_len)
result = float(num_matches*100.0/total_docs)
print(' {} matched {:.2f}% of doctype'.format(matches_len, result))
return result
def calc_center(cluster, distance_dict):
center = ''
min_sum_squares = sys.maxsize
for member, value in cluster.items():
sum_squares = 0
member_dict = distance_dict[member]
for key, score in member_dict.items():
dist = score
sum_squares += dist ** 2
if sum_squares < min_sum_squares:
min_sum_squares = sum_squares
center = member
return center
def get_list_by_doctype(doctype, distance_dict):
result = []
for key, item in distance_dict.items():
try:
if item['doctype'] == doctype and not doctype == '':
result.append(key)
except:
continue
return result
def get_all_doctypes(distances):
result = []
for doc_id, value in distances.items():
doctype = value['doctype']
if doctype not in result:
result.append(doctype)
return result
def get_doctype_lists(distance_dict):
result = {}
doctypes = get_all_doctypes(distance_dict)
for doctype in doctypes:
result[doctype] = get_list_by_doctype(doctype, distance_dict)
return result
def get_all_of_doctype(doctype, distance_dict):
result = {}
for key, item in distance_dict.items():
try:
if item['doctype'] == doctype and not doctype == '':
result[key] = item['scores']
except:
continue
return result
def filter_docs_by_coverage(doctype_list, distances, coverage_percent):
result_cluster = []
doctype_len = len(doctype_list)
# Loop through the doctype cluster
for member in doctype_list:
if member not in distances:
continue
# Get the member dict.
member_dict = distances[member]
member_len = len(distances[member])
percent_coverage = get_doctype_coverage(doctype_len, member_len)
# If the doctype coverage is over the threshold, add it to the result.
if percent_coverage >= coverage_percent:
result_cluster.append(member)
return result_cluster
def get_centers(doctypes, distances):
result = {}
for cluster, docs in doctypes.items():
if len(docs) == 0:
print('Skipping empty cluster...')
continue
print('Computing center of %s (%d documents).' % (doctype, len(cluster)))
try:
center, dist = DoctypeCenter.calc_center(docs, distances)
result[doctype] = center
except:
print('Error with cluster: ' + cluster)
continue
return result
def make_distance_dict(doc_dict):
result = {}
for src_doc, value in doc_dict.items():
scores = value['scores']
tmp_res = {}
for doc, score in scores.items():
if not doc == src_doc:
tmp_res[doc] = score
tmp_res[src_doc] = 0
result[src_doc] = tmp_res
return result
def make_doctype_dict(doc_id_dict):
result = {}
for key, value in doc_id_dict.items():
try:
doctype = value['doctype']
except:
continue
if doctype == '':
continue
if doctype not in result:
result[doctype] = get_all_of_doctype(doctype, doc_id_dict)
return result
# This is the main function of the program.
def main(arg):
print('Required argument = %s' % arg)
print('Optional argument = %s' % global_opt)
# This is where we call the main method from.
if __name__ == '__main__':
parser.add_argument('-o', '--output', required=True, help='The output file path.')
parser.add_argument('-i', '--input_dir', required=True, help='The path to the input distance files.')
args = parser.parse_args()
out_file = args.output
test_dir = args.input_dir
mapping_file, distance_files = get_files_from_dir(test_dir)
maps, id_map = load_map_file(mapping_file)
print('Map file = %s' % mapping_file)
dist_dict = load_unmapped_docs(distance_files, id_map, maps)
final_distance_dict = make_distance_dict(dist_dict)
doctype_dict = get_doctype_lists(dist_dict)
filtered_docs = {}
for doctype, docs in doctype_dict.items():
filtered_docs[doctype] = filter_docs_by_coverage(docs, final_distance_dict, 100)
print('Converting to a useful dict...')
final_dict = make_doctype_dict(dist_dict)
print('Computing centers...')
centers = get_centers(filtered_docs, final_distance_dict)
print('Writing centers...')
with open(out_file, 'w+') as writer:
file_txt = ''
for doctype, centerID in centers.items():
file_txt += '{}: {}\n'.format(doctype, centerID)
writer.write(file_txt)
print('Done!')