316 lines
9.3 KiB
Python
316 lines
9.3 KiB
Python
# We do all our imports at the top of our program.
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import DoctypeCenter
|
|
|
|
# Give the program a name.
|
|
program_name = 'Program name'
|
|
# Describe what the program does beiefly.
|
|
program_description = 'Brief description.'
|
|
# The argument parser for the program.
|
|
parser = argparse.ArgumentParser(prog=program_name, description=program_description, add_help=False)
|
|
# This is where optional arguments are put (They are optional so they have a default value assign it here).
|
|
# Doing it this way keeps the code more readable since all default values are in one place, and easy to change without
|
|
# needing to search through our script for them.
|
|
global_opt = 'default value'
|
|
# Error and Warning console values:
|
|
red_error = '\033[91mError:\033[0m'
|
|
yellow_warning = '\033[93mWARNING:\033[0m'
|
|
blue_okay = '\033[94mOK\033[0m'
|
|
program_header = format('\033[95m%s\033[0m\n'
|
|
'-----------------------' % program_name)
|
|
# Default decision message.
|
|
decision_message = ' Is this okay? (Y/N): '
|
|
|
|
AVG_PERCENT_MATCH = 0.0
|
|
TOTAL_NUM_DOCS = 0
|
|
|
|
|
|
def clean_doc_file_name(full_file_name):
|
|
result_file_name = full_file_name.replace(r'e:\tmp\daveg-xfer', r'C:\Users\chris\Documents\Code\Tests\KMeans')
|
|
return result_file_name
|
|
|
|
|
|
def load_map_file(map_file):
|
|
result = {}
|
|
ids_to_files = {}
|
|
with open(map_file) as reader:
|
|
lines = reader.readlines()
|
|
|
|
for line in lines:
|
|
line = line.replace('\n', '')
|
|
split_idx = line.rfind(':')
|
|
n_line = line[:split_idx]
|
|
doc_name = clean_doc_file_name(n_line)
|
|
doc_id = int(line[split_idx + 1:])
|
|
result[doc_name] = doc_id
|
|
ids_to_files[doc_id] = doc_name
|
|
|
|
return result, ids_to_files
|
|
|
|
|
|
def load_docs(file_name, result, id_mapping, name_mapping):
|
|
with open(file_name) as reader:
|
|
lines = reader.readlines()
|
|
|
|
current_doctype = ''
|
|
current_file_name = ''
|
|
current_doc_id = 0
|
|
current_file_scores = {}
|
|
num_ran = 0
|
|
num_skipped = 0
|
|
num_scores = 0
|
|
for line in lines:
|
|
num_ran += 1
|
|
if line is None or line.isspace():
|
|
continue
|
|
|
|
if line.startswith('#'):
|
|
result[current_file_name] = {'scores': convert_scores(current_file_scores), 'doctype': current_doctype}
|
|
current_file_name = clean_doc_file_name(line[1:-1])
|
|
current_file_scores = {}
|
|
current_doctype = get_doctype_from_name(current_file_name)
|
|
|
|
if current_file_name in name_mapping:
|
|
current_doc_id = name_mapping[current_file_name]
|
|
|
|
elif line.startswith('*'):
|
|
line_data = line[1:-1].split(':')
|
|
if len(line_data) != 2:
|
|
continue
|
|
try:
|
|
doc_id = int(line_data[0])
|
|
tmp_doctype = get_doctype_from_id(doc_id, id_mapping)
|
|
tmp_name = id_mapping[doc_id]
|
|
if tmp_doctype == current_doctype:
|
|
score = float(line_data[1])
|
|
current_file_scores[tmp_name] = score
|
|
num_scores += 1
|
|
else:
|
|
num_skipped += 1
|
|
except:
|
|
continue
|
|
|
|
if current_file_name in name_mapping:
|
|
result[current_doc_id] = {'scores': current_file_scores, 'doctype': current_doctype}
|
|
|
|
print('Skipped %d scores and saved %d scores' % (num_skipped, num_scores))
|
|
return result
|
|
|
|
|
|
def load_unmapped_docs(dist_files, map_ids, map_names):
|
|
result = {}
|
|
for dist_file in dist_files:
|
|
print('Loading file %s' % dist_file)
|
|
result = load_docs(dist_file, result, map_ids, map_names)
|
|
return result
|
|
|
|
|
|
def get_files_from_dir(directory):
|
|
dist_files = []
|
|
map_file = ''
|
|
for file in os.listdir(directory):
|
|
if file.endswith('.dist'):
|
|
dist_files.append(os.path.join(directory, file))
|
|
elif file.endswith('Maps.txt'):
|
|
map_file = os.path.join(directory, file)
|
|
return map_file, dist_files
|
|
|
|
|
|
def get_doctype_from_name(file):
|
|
split_idx = file.rfind('\\')
|
|
result = file[:split_idx]
|
|
end_idx = result.rfind('\\')
|
|
result = result[end_idx+1:]
|
|
return result
|
|
|
|
|
|
def get_doctype_from_id(docId, id_map):
|
|
key = id_map[docId]
|
|
return get_doctype_from_name(key)
|
|
|
|
|
|
def distance_from_score(score):
|
|
return float((10 - score) + float(1))
|
|
|
|
|
|
def convert_scores(scores_dict):
|
|
for doc_id, score in scores_dict.items():
|
|
scores_dict[doc_id] = distance_from_score(score)
|
|
return scores_dict
|
|
|
|
|
|
def get_doctype_coverage(cluster_size, matches_len):
|
|
total_docs = float(cluster_size)
|
|
num_matches = float(matches_len)
|
|
result = float(num_matches*100.0/total_docs)
|
|
print(' {} matched {:.2f}% of doctype'.format(matches_len, result))
|
|
return result
|
|
|
|
|
|
def calc_center(cluster, distance_dict):
|
|
center = ''
|
|
min_sum_squares = sys.maxsize
|
|
for member, value in cluster.items():
|
|
sum_squares = 0
|
|
member_dict = distance_dict[member]
|
|
for key, score in member_dict.items():
|
|
dist = score
|
|
sum_squares += dist ** 2
|
|
if sum_squares < min_sum_squares:
|
|
min_sum_squares = sum_squares
|
|
center = member
|
|
return center
|
|
|
|
|
|
def get_list_by_doctype(doctype, distance_dict):
|
|
result = []
|
|
for key, item in distance_dict.items():
|
|
try:
|
|
if item['doctype'] == doctype and not doctype == '':
|
|
result.append(key)
|
|
except:
|
|
continue
|
|
|
|
return result
|
|
|
|
|
|
def get_all_doctypes(distances):
|
|
result = []
|
|
for doc_id, value in distances.items():
|
|
doctype = value['doctype']
|
|
if doctype not in result:
|
|
result.append(doctype)
|
|
return result
|
|
|
|
|
|
def get_doctype_lists(distance_dict):
|
|
result = {}
|
|
doctypes = get_all_doctypes(distance_dict)
|
|
for doctype in doctypes:
|
|
result[doctype] = get_list_by_doctype(doctype, distance_dict)
|
|
return result
|
|
|
|
|
|
def get_all_of_doctype(doctype, distance_dict):
|
|
result = {}
|
|
for key, item in distance_dict.items():
|
|
try:
|
|
if item['doctype'] == doctype and not doctype == '':
|
|
result[key] = item['scores']
|
|
except:
|
|
continue
|
|
return result
|
|
|
|
|
|
def filter_docs_by_coverage(doctype_list, distances, coverage_percent):
|
|
result_cluster = []
|
|
doctype_len = len(doctype_list)
|
|
# Loop through the doctype cluster
|
|
for member in doctype_list:
|
|
if member not in distances:
|
|
continue
|
|
# Get the member dict.
|
|
member_dict = distances[member]
|
|
member_len = len(distances[member])
|
|
percent_coverage = get_doctype_coverage(doctype_len, member_len)
|
|
# If the doctype coverage is over the threshold, add it to the result.
|
|
if percent_coverage >= coverage_percent:
|
|
result_cluster.append(member)
|
|
|
|
return result_cluster
|
|
|
|
|
|
def get_centers(doctypes, distances):
|
|
result = {}
|
|
|
|
for cluster, docs in doctypes.items():
|
|
if len(docs) == 0:
|
|
print('Skipping empty cluster...')
|
|
continue
|
|
print('Computing center of %s (%d documents).' % (doctype, len(cluster)))
|
|
try:
|
|
center, dist = DoctypeCenter.calc_center(docs, distances)
|
|
result[doctype] = center
|
|
except:
|
|
print('Error with cluster: ' + cluster)
|
|
continue
|
|
return result
|
|
|
|
|
|
def make_distance_dict(doc_dict):
|
|
result = {}
|
|
for src_doc, value in doc_dict.items():
|
|
scores = value['scores']
|
|
tmp_res = {}
|
|
for doc, score in scores.items():
|
|
if not doc == src_doc:
|
|
tmp_res[doc] = score
|
|
tmp_res[src_doc] = 0
|
|
result[src_doc] = tmp_res
|
|
return result
|
|
|
|
|
|
def make_doctype_dict(doc_id_dict):
|
|
result = {}
|
|
for key, value in doc_id_dict.items():
|
|
try:
|
|
doctype = value['doctype']
|
|
except:
|
|
continue
|
|
if doctype == '':
|
|
continue
|
|
|
|
if doctype not in result:
|
|
result[doctype] = get_all_of_doctype(doctype, doc_id_dict)
|
|
return result
|
|
|
|
|
|
# This is the main function of the program.
|
|
def main(arg):
|
|
print('Required argument = %s' % arg)
|
|
print('Optional argument = %s' % global_opt)
|
|
|
|
|
|
# This is where we call the main method from.
|
|
if __name__ == '__main__':
|
|
parser.add_argument('-o', '--output', required=True, help='The output file path.')
|
|
parser.add_argument('-i', '--input_dir', required=True, help='The path to the input distance files.')
|
|
|
|
args = parser.parse_args()
|
|
|
|
out_file = args.output
|
|
test_dir = args.input_dir
|
|
|
|
mapping_file, distance_files = get_files_from_dir(test_dir)
|
|
maps, id_map = load_map_file(mapping_file)
|
|
|
|
print('Map file = %s' % mapping_file)
|
|
|
|
dist_dict = load_unmapped_docs(distance_files, id_map, maps)
|
|
|
|
final_distance_dict = make_distance_dict(dist_dict)
|
|
|
|
doctype_dict = get_doctype_lists(dist_dict)
|
|
|
|
filtered_docs = {}
|
|
for doctype, docs in doctype_dict.items():
|
|
filtered_docs[doctype] = filter_docs_by_coverage(docs, final_distance_dict, 100)
|
|
|
|
print('Converting to a useful dict...')
|
|
final_dict = make_doctype_dict(dist_dict)
|
|
|
|
print('Computing centers...')
|
|
centers = get_centers(filtered_docs, final_distance_dict)
|
|
|
|
print('Writing centers...')
|
|
with open(out_file, 'w+') as writer:
|
|
file_txt = ''
|
|
for doctype, centerID in centers.items():
|
|
file_txt += '{}: {}\n'.format(doctype, centerID)
|
|
writer.write(file_txt)
|
|
|
|
print('Done!')
|
|
|