# We do all our imports at the top of our program. import argparse import os import sys import DoctypeCenter # Give the program a name. program_name = 'Program name' # Describe what the program does beiefly. program_description = 'Brief description.' # The argument parser for the program. parser = argparse.ArgumentParser(prog=program_name, description=program_description, add_help=False) # This is where optional arguments are put (They are optional so they have a default value assign it here). # Doing it this way keeps the code more readable since all default values are in one place, and easy to change without # needing to search through our script for them. global_opt = 'default value' # Error and Warning console values: red_error = '\033[91mError:\033[0m' yellow_warning = '\033[93mWARNING:\033[0m' blue_okay = '\033[94mOK\033[0m' program_header = format('\033[95m%s\033[0m\n' '-----------------------' % program_name) # Default decision message. decision_message = ' Is this okay? (Y/N): ' AVG_PERCENT_MATCH = 0.0 TOTAL_NUM_DOCS = 0 def clean_doc_file_name(full_file_name): result_file_name = full_file_name.replace(r'e:\tmp\daveg-xfer', r'C:\Users\chris\Documents\Code\Tests\KMeans') return result_file_name def load_map_file(map_file): result = {} ids_to_files = {} with open(map_file) as reader: lines = reader.readlines() for line in lines: line = line.replace('\n', '') split_idx = line.rfind(':') n_line = line[:split_idx] doc_name = clean_doc_file_name(n_line) doc_id = int(line[split_idx + 1:]) result[doc_name] = doc_id ids_to_files[doc_id] = doc_name return result, ids_to_files def load_docs(file_name, result, id_mapping, name_mapping): with open(file_name) as reader: lines = reader.readlines() current_doctype = '' current_file_name = '' current_doc_id = 0 current_file_scores = {} num_ran = 0 num_skipped = 0 num_scores = 0 for line in lines: num_ran += 1 if line is None or line.isspace(): continue if line.startswith('#'): result[current_file_name] = {'scores': convert_scores(current_file_scores), 'doctype': current_doctype} current_file_name = clean_doc_file_name(line[1:-1]) current_file_scores = {} current_doctype = get_doctype_from_name(current_file_name) if current_file_name in name_mapping: current_doc_id = name_mapping[current_file_name] elif line.startswith('*'): line_data = line[1:-1].split(':') if len(line_data) != 2: continue try: doc_id = int(line_data[0]) tmp_doctype = get_doctype_from_id(doc_id, id_mapping) tmp_name = id_mapping[doc_id] if tmp_doctype == current_doctype: score = float(line_data[1]) current_file_scores[tmp_name] = score num_scores += 1 else: num_skipped += 1 except: continue if current_file_name in name_mapping: result[current_doc_id] = {'scores': current_file_scores, 'doctype': current_doctype} print('Skipped %d scores and saved %d scores' % (num_skipped, num_scores)) return result def load_unmapped_docs(dist_files, map_ids, map_names): result = {} for dist_file in dist_files: print('Loading file %s' % dist_file) result = load_docs(dist_file, result, map_ids, map_names) return result def get_files_from_dir(directory): dist_files = [] map_file = '' for file in os.listdir(directory): if file.endswith('.dist'): dist_files.append(os.path.join(directory, file)) elif file.endswith('Maps.txt'): map_file = os.path.join(directory, file) return map_file, dist_files def get_doctype_from_name(file): split_idx = file.rfind('\\') result = file[:split_idx] end_idx = result.rfind('\\') result = result[end_idx+1:] return result def get_doctype_from_id(docId, id_map): key = id_map[docId] return get_doctype_from_name(key) def distance_from_score(score): return float((10 - score) + float(1)) def convert_scores(scores_dict): for doc_id, score in scores_dict.items(): scores_dict[doc_id] = distance_from_score(score) return scores_dict def get_doctype_coverage(cluster_size, matches_len): total_docs = float(cluster_size) num_matches = float(matches_len) result = float(num_matches*100.0/total_docs) print(' {} matched {:.2f}% of doctype'.format(matches_len, result)) return result def calc_center(cluster, distance_dict): center = '' min_sum_squares = sys.maxsize for member, value in cluster.items(): sum_squares = 0 member_dict = distance_dict[member] for key, score in member_dict.items(): dist = score sum_squares += dist ** 2 if sum_squares < min_sum_squares: min_sum_squares = sum_squares center = member return center def get_list_by_doctype(doctype, distance_dict): result = [] for key, item in distance_dict.items(): try: if item['doctype'] == doctype and not doctype == '': result.append(key) except: continue return result def get_all_doctypes(distances): result = [] for doc_id, value in distances.items(): doctype = value['doctype'] if doctype not in result: result.append(doctype) return result def get_doctype_lists(distance_dict): result = {} doctypes = get_all_doctypes(distance_dict) for doctype in doctypes: result[doctype] = get_list_by_doctype(doctype, distance_dict) return result def get_all_of_doctype(doctype, distance_dict): result = {} for key, item in distance_dict.items(): try: if item['doctype'] == doctype and not doctype == '': result[key] = item['scores'] except: continue return result def filter_docs_by_coverage(doctype_list, distances, coverage_percent): result_cluster = [] doctype_len = len(doctype_list) # Loop through the doctype cluster for member in doctype_list: if member not in distances: continue # Get the member dict. member_dict = distances[member] member_len = len(distances[member]) percent_coverage = get_doctype_coverage(doctype_len, member_len) # If the doctype coverage is over the threshold, add it to the result. if percent_coverage >= coverage_percent: result_cluster.append(member) return result_cluster def get_centers(doctypes, distances): result = {} for cluster, docs in doctypes.items(): if len(docs) == 0: print('Skipping empty cluster...') continue print('Computing center of %s (%d documents).' % (doctype, len(cluster))) try: center, dist = DoctypeCenter.calc_center(docs, distances) result[doctype] = center except: print('Error with cluster: ' + cluster) continue return result def make_distance_dict(doc_dict): result = {} for src_doc, value in doc_dict.items(): scores = value['scores'] tmp_res = {} for doc, score in scores.items(): if not doc == src_doc: tmp_res[doc] = score tmp_res[src_doc] = 0 result[src_doc] = tmp_res return result def make_doctype_dict(doc_id_dict): result = {} for key, value in doc_id_dict.items(): try: doctype = value['doctype'] except: continue if doctype == '': continue if doctype not in result: result[doctype] = get_all_of_doctype(doctype, doc_id_dict) return result # This is the main function of the program. def main(arg): print('Required argument = %s' % arg) print('Optional argument = %s' % global_opt) # This is where we call the main method from. if __name__ == '__main__': parser.add_argument('-o', '--output', required=True, help='The output file path.') parser.add_argument('-i', '--input_dir', required=True, help='The path to the input distance files.') args = parser.parse_args() out_file = args.output test_dir = args.input_dir mapping_file, distance_files = get_files_from_dir(test_dir) maps, id_map = load_map_file(mapping_file) print('Map file = %s' % mapping_file) dist_dict = load_unmapped_docs(distance_files, id_map, maps) final_distance_dict = make_distance_dict(dist_dict) doctype_dict = get_doctype_lists(dist_dict) filtered_docs = {} for doctype, docs in doctype_dict.items(): filtered_docs[doctype] = filter_docs_by_coverage(docs, final_distance_dict, 100) print('Converting to a useful dict...') final_dict = make_doctype_dict(dist_dict) print('Computing centers...') centers = get_centers(filtered_docs, final_distance_dict) print('Writing centers...') with open(out_file, 'w+') as writer: file_txt = '' for doctype, centerID in centers.items(): file_txt += '{}: {}\n'.format(doctype, centerID) writer.write(file_txt) print('Done!')