ScoreWalker/scorewalker-utils/KMeans/DocumentDistance.py
2025-03-13 00:13:53 -06:00

377 lines
14 KiB
Python

"""
DocumentDistance.py
===================
This is a tool for calculating the distance between any two documents, strings, or a list of documents. This tool
currently only uses the Levenshtein distance for it's distance calculations, there are plans to add more in the
future if needed. This is where all the distance calculations will live as this toolset expands and increases in
complexity.
.. moduleauthor:: Chris Diesch <cdiesch@sequencelogic.net>
"""
# We do all our imports at the top of our program.
import argparse
import os
import json
import numpy as np
import concurrent.futures
import threading
import sys
import ConsoleUtils
# Give the program a name.
program_name = 'LevDist'
# Describe what the program does briefly.
program_description = 'Gets the Levenshtein distance between the .tkn files in the library.'
# The argument parser for the program.
parser = argparse.ArgumentParser(prog=program_name, description=program_description, add_help=False)
# Error and Warning console values:
lev_weight = 1
img_weight = 1
tkn_cnt_weight = 1
red_error = '\033[91mError:\033[0m'
program_header = format('\033[95m%s\033[0m' % program_name)
# Constants
tkn_vals_tag = 'tokens'
tkn_cnts_tag = 'counts'
name_tag = 'name'
delimiter = '|'
print_lock = threading.Lock()
num_dashes_console = 60
# Things to hold in memory
# This is going to calculate the distance between the token counts of the documents. This is going to be a metric used
# in finding the distance between two documents for classification.
# TODO: Implement this.
def token_count_dist(source, target):
"""
Currently does nothing, is intended to compute the distance between the token counts of two documents.
:param source: The source document to compare.
:type source: str
:param target: The target document to copare.
:type target: str
:return: A value representing the distance between the token counts of the two documents.
:rtype: float
"""
return 0
# This is going to calculate the 'distance' between two images by first fuzzing them, then doing some analysis.
# This will be a metric used in calculating the distance between two documents for classification.
# TODO: Implement this.
def img_dist(source, target):
"""
Currently does nothing, is intended to calculate the distance between two images.
:param source: The path to a source image to compare.
:type source: str
:param target: The path to a target image to compare.
:type target: str
:return: A value representing the distance between the two images.
:rtype: float
"""
return 0
# Calculate the Levenshtein distance between two strings. Since we don't care about reconstructing the strings from the
# edits, we can get away with an optimization where we only need the last two rows of the matrix. We do this by 'moving'
# with the rows as we compute the edit cost.
# TODO: Implement weighting certain tokens based on a file containing the weighting metrics.
def lev_dist(source, target):
"""
Calculates the Levenshtein distance between source and target.
:param source: A list of ints representing the tokens from the source doc.
:type source: list of int
:param target: A list of ints representing the tokens from the target doc.
:type target: list of int
.. raw:: html
<br><hr>
:return: The Levenshtein distance between source and target.
:rtype: int
"""
# We assume source >= target.
if len(source) < len(target):
return lev_dist(target, source)
# So now we have len(source) >= len(target).
if len(target) == 0:
return len(source)
source = np.array(tuple(source))
target = np.array(tuple(target))
previous_row = np.arange(target.size + 1)
for tkn_val in source:
# Insertion (target grows longer than source):
current_row = previous_row + 1
# Substitution or matching: Target and source items are aligned, and either are different (cost of 1), or are
# the same (cost of 0). target != tkn_val produces an array of boolean values corresponding to weather or not
# target[index] == tkn_val. This is used for incrementing the rows as we go.
current_row[1:] = np.minimum(current_row[1:], np.add(previous_row[:-1], target != tkn_val))
# Deletion (target grows shorter than source):
current_row[1:] = np.minimum(current_row[1:], current_row[0:-1] + 1)
# Reset rows for next pass.
previous_row = current_row
result = previous_row[-1]
return str(result)
# For right now this only calculates the Levenshtein distance. Ultimately it will use many more metrics and will read
# from a weighting file which contains all the metadata for how to weight all the individual components of the distance
# calculation.
def get_document_distance(source_doc, target_doc):
"""
Computes the distance between the source doc and the target doc (currently only uses lev_dist).
:param source_doc: A list of ints representing the tokens from the source doc.
:type source_doc: list of int
:param target_doc: A list of ints representing the tokens from the target doc.
:type target_doc: list of int
.. raw:: html
<br><hr>
:return: The distance between source_doc and target_doc.
:rtype: int
"""
result = lev_dist(source_doc, target_doc)
return result
def lev_dist_by_name(source_name, target_name, files):
"""
Computes the Levenshtein distance between two documents were source_name and target_name are keys in the files dict.
:param source_name: The key for the source document in files.
:type source_name: str
:param target_name: The key for the target document in files.
:type target_name: str
:param files: The dict with keys representing the names of the files and values representing the tokens of the
files mapped to integers.
.. raw:: html
<br><hr>
:return: The Levenshtein Distance between the documents represented by source_name and target_name.
:rtype: int
"""
return lev_dist(files[source_name][tkn_vals_tag], files[target_name][tkn_vals_tag]), threading.get_ident()
def get_distance_threaded(source_file, thread_count, file_num, ext, files):
"""
Computes the distance between source_file and every file in files using thread_count threads.
:param source_file: The name of the file to compute the Levenshtein distance against all files in files.
:type source_file: str
:param thread_count: The number of threads to execute on.
:type thread_count: int
:param file_num: The number representing the current file being run.
:type file_num: int
:param ext: The extension to put at the end of the output file.
:type ext: str
:param files: The dict of files with keys representing the names of the files and values representing the tokens
mapped to integers for the files.
.. raw:: html
<br>
:return: None.
"""
final_json = {}
# We don't want to compare the other way, it's unnecessary and will be detrimental to performance.This is just a
# place-holder for the moment, maybe we will want to put other data here eventually...
source_name = source_file
# Get all the other names left...
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as pool:
future_scores = {pool.submit(lev_dist_by_name, source_name, target_name, files):
target_name for target_name in files.keys()}
total_run = 0
num_to_run = len(future_scores)
num_times_found_src = 0
self_lev_score = None
for future in concurrent.futures.as_completed(future_scores):
total_run += 1
lev_score, thread_id = future.result()
target_name = future_scores[future]
if source_name == target_name:
num_times_found_src += 1
self_lev_score = lev_score
final_json[target_name] = lev_score
sys.stdout.write('\r' + ConsoleUtils.get_prg_line(total_run, num_to_run, 40, 70))
print('{:^70}'.format('Self Levenshtein Distance = %s (should be 0)' % self_lev_score))
print('{:^70}'.format('Writing to json...'))
write_to_file(final_json, source_name, ext)
print('{:^70}'.format('Done!'))
def read_tokens_from_file(lev_file):
"""
Reads the tokens from the given '.lev' file.
:param lev_file: The '.lev' file to read the tokens from.
:type lev_file: str
.. raw:: html
<br>
:return: A list of str representing the integer mapped tokens in the file and the name of the file.
:rtype: tuple(list of str, str)
"""
result = {}
with open(lev_file, 'r') as file_reader:
file_lines = file_reader.readlines()
tkn_vals = file_lines[0].replace('\n', '').split(' ')
tkn_cnts = file_lines[1].replace('\n', '').split(' ')
result[tkn_vals_tag] = tkn_vals
result[tkn_cnts_tag] = tkn_cnts
return result, lev_file
def load_files(doc_list, thread_count):
"""
Loads the files in doc_list on thread_count threads using read_tokens_from_file.
:param doc_list: The list of documents to load in.
:type doc_list: list of str
:param thread_count: The number of threads to execute on.
:type thread_count: int
.. raw:: html
<br>
:return: A dict with keys representing the names of the files and values representing the mapped tokens of the file.
:rtype: dict[str,str]
"""
result = {}
num_loaded = 0
print('{:^70}'.format('Loading files for distance calculations (%d threads).' % thread_count))
with open(doc_list, 'r+') as file:
file_locs = file.readlines()
num_total = len(file_locs)
for i in range(num_total):
file_locs[i] = file_locs[i].replace('mount_eng', 'mount_eng').replace('\n', '')
if not file_locs[i].endswith('.lev'):
file_locs[i] = file_locs[i].replace('.tkn', '.lev')
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as pool:
future_to_open = {pool.submit(read_tokens_from_file, file): file for file in file_locs}
for future in concurrent.futures.as_completed(future_to_open):
num_loaded += 1
res, name = future.result()
result[name] = res
ConsoleUtils.print_progress_bar(num_loaded, num_total, 50, 70)
print('{:^70}'.format('{:s} ({:d}{:s})'.format('Done Loading Files!', num_total, ' files loaded.')))
return result
def get_distances(files, skip_files, file_ext, thread_count):
"""
Gets the distances between all files in files and skips files which have already been run if skip_files is true
(uses get_distance_threaded for each file).
:param files: The list of files to compute the distances between.
:type files: dict[str,str]
:param skip_files: Weather or not to skip already computed files.
:type skip_files: bool
:param file_ext: The extension to end each file with.
:type file_ext: str
:param thread_count: The number of threads to execute on.
:type thread_count: int
.. raw:: html
<br>
:return: None.
"""
num_files_compared = 0
num_files = len(files)
for name, values in files.items():
print('*' * 70)
num_files_compared += 1
print('{:^70}'.format('Getting distances for file %d/%d' % (num_files_compared, num_files)))
if (not os.path.exists(name.replace('.lev', file_ext))) and skip_files:
get_distance_threaded(name, thread_count, num_files_compared, file_ext, files)
else:
print('{:^70}'.format('Skipping file because it already exists.'))
def write_to_file(json_info, source_doc, ext):
file_loc = source_doc.replace('.lev', ext)
with open(file_loc, 'w+') as writer:
writer.write(json.dumps(json_info))
def run_with_list(doc_list, thread_count, file_ext, skip_files):
"""
Runs the distance calculation only on files contained in doc_list, and skips already computed files if skip_files is
true.
:param doc_list: The path to a file containing a list of files to compute the distances between.
:type doc_list: str
:param thread_count: The number of threads to execute on.
:type thread_count: int
:param file_ext: The extension to put on the end of each file.
:type file_ext: str
:param skip_files: Weather or not to skip already computed files.
:type skip_files: bool
.. raw:: html
<br>
:return: None.
"""
files = load_files(doc_list, thread_count)
get_distances(files, skip_files, file_ext)
def main(doc_list, thread_count, file_ext, doctype='all'):
"""
The main entry point of the program.
:param doc_list: The path to a file containing the list of documents to compute the distances between.
:type doc_list: str
:param thread_count: The number of threads to execute on.
:type thread_count: int
:param file_ext: The extension to end each file with.
:type file_ext: str
:param doctype: The name of the doctypes being computer ('all' by default).
:type doctype: str
.. raw:: html
<br>
:return: The status of the program.
:rtype: int
"""
header = '*' * 70 + '\n' +\
'*{:^77}*'.format(program_header) + '\n' +\
'*{:^68}*'.format(format('Running on %s with %d threads' % (doctype, thread_count))) + '\n'+\
'*' * 70 + '\n' +\
'*' * 70
print(header)
run_files = load_files(doc_list, thread_count)
get_distances(run_files, True, file_ext, thread_count)
# This is where we call the main method from.
if __name__ == '__main__':
parser.add_argument('-f', '--tkn_file')
parser.add_argument('-e', '--ext')
parser.add_argument('-t', '--thread_count', type=int)
parser.add_argument('-d', '--doctype')
args = parser.parse_args()
tkn_file = args.tkn_file
ext = args.ext
thread_count = args.thread_count
doctype = args.doctype
main(tkn_file, thread_count, ext, doctype)