208 lines
6.7 KiB
Python
208 lines
6.7 KiB
Python
import json
|
|
import argparse
|
|
import os
|
|
import string
|
|
import numpy as np
|
|
import sys
|
|
|
|
|
|
def load_ocr_data(file_name):
|
|
ocr_text = ''
|
|
ocr_conf = ''
|
|
with open(file_name) as reader:
|
|
json_data = json.load(reader)
|
|
pages = json_data['pages']
|
|
for page in pages:
|
|
lines = page['lines']
|
|
num_lines = len(lines)
|
|
current_line_num = 0
|
|
for line in lines:
|
|
current_line_num += 1
|
|
line_chars = str.lower(line['chars'].translate(str.maketrans('', '', string.punctuation)))
|
|
line_confs = line['confs']
|
|
ocr_text += line_chars
|
|
ocr_conf += line_confs
|
|
return ocr_text, ocr_conf
|
|
|
|
|
|
def is_word_confident(line_confs, min_confidence):
|
|
for i in range(len(line_confs)):
|
|
conf_num = int(line_confs[i])
|
|
if conf_num < min_confidence:
|
|
return False
|
|
return True
|
|
|
|
|
|
def find_unconfident_words(ocr_words, min_conf):
|
|
unconfident_words = []
|
|
for vals in ocr_words:
|
|
conf = vals['conf']
|
|
if not is_word_confident(conf, min_conf):
|
|
unconfident_words.append(vals)
|
|
return unconfident_words
|
|
|
|
|
|
def load_dict(dict_file):
|
|
result = {}
|
|
with open(dict_file) as reader:
|
|
lines = reader.readlines()
|
|
for i in range(len(lines)):
|
|
line = str.lower(lines[i].translate(str.maketrans('', '', string.whitespace)))
|
|
result[line] = i
|
|
return result
|
|
|
|
|
|
def split_into_words(ocr_text, ocr_conf):
|
|
words = []
|
|
previous_split_idx = 0
|
|
for i in range(len(ocr_text)):
|
|
if ocr_text[i] == ' ':
|
|
word = ocr_text[previous_split_idx:i]
|
|
conf = ocr_conf[previous_split_idx:i]
|
|
# We set the previous to + 1 because we don't want the leading space...
|
|
previous_split_idx = i + 1
|
|
words.append({'word': word, 'conf': conf})
|
|
return words
|
|
|
|
|
|
def is_in_dict(word, dict_words):
|
|
return word in dict_words
|
|
|
|
|
|
def correct_confs(line_confs, max_conf):
|
|
result_confs = ''
|
|
for i in range(len(line_confs)):
|
|
result_confs += str(max_conf)
|
|
return result_confs
|
|
|
|
|
|
def get_remaining_bad_words(corrected, unconfident):
|
|
remaining = []
|
|
for i in range(len(unconfident)):
|
|
found = False
|
|
for j in range(len(corrected)):
|
|
if unconfident[i]['word'] == corrected[j]['word']:
|
|
found = True
|
|
break
|
|
if not found:
|
|
remaining.append(unconfident[i])
|
|
return remaining
|
|
|
|
|
|
def lev_dist(source, target):
|
|
# We assume source >= target.
|
|
if len(source) < len(target):
|
|
return lev_dist(target, source)
|
|
# So now we have len(source) >= len(target).
|
|
if len(target) == 0:
|
|
return len(source)
|
|
|
|
source = np.array(tuple(source))
|
|
target = np.array(tuple(target))
|
|
|
|
previous_row = np.arange(target.size + 1)
|
|
for tkn_val in source:
|
|
# Insertion (target grows longer than source):
|
|
current_row = previous_row + 1
|
|
# Substitution or matching: Target and source items are aligned, and either are different (cost of 1), or are
|
|
# the same (cost of 0). target != tkn_val produces an array of boolean values corresponding to weather or not
|
|
# target[index] == tkn_val. This is used for incrementing the rows as we go.
|
|
current_row[1:] = np.minimum(current_row[1:], np.add(previous_row[:-1], target != tkn_val))
|
|
# Deletion (target grows shorter than source):
|
|
current_row[1:] = np.minimum(current_row[1:], current_row[0:-1] + 1)
|
|
# Reset rows for next pass.
|
|
previous_row = current_row
|
|
result = previous_row[-1]
|
|
return result
|
|
|
|
|
|
# Finds the closest word with a Levenshtein distance under min_edit.
|
|
# If there are no words under that threshold, it returns None.
|
|
def find_closest_word(word, dictionary):
|
|
closest_word = None
|
|
min_distance = sys.maxsize
|
|
for dict_word in dictionary:
|
|
dist = lev_dist(word, dict_word)
|
|
# For right now, just return the first word found, this may change though...
|
|
if dist < min_distance:
|
|
closest_word = dict_word
|
|
min_distance = dist
|
|
return closest_word, min_distance
|
|
|
|
|
|
def find_words_in_dict(words, dictionary, max_distance):
|
|
result_words = []
|
|
for word_data in words:
|
|
word = word_data['word']
|
|
conf = word_data['conf']
|
|
# If every character has to be changed, we don't want to even try...
|
|
if max_distance < len(word):
|
|
replace_word, distance = find_closest_word(word, dictionary)
|
|
# If the words were close enough...
|
|
if distance <= max_distance:
|
|
replace_conf = correct_confs(conf, 9)
|
|
result_words.append({'newWord': replace_word, 'newConf': replace_conf,
|
|
'oldWord': word, 'oldConf': conf})
|
|
else:
|
|
print('Cannot attempt replacement. Length of %s (%d) is less than the max distance allowed(%d).' %
|
|
(word, len(word), max_distance))
|
|
return result_words
|
|
|
|
|
|
def split_word(word):
|
|
# Get all the possible 'splits' of a word
|
|
w = word['word']
|
|
c = word['conf']
|
|
result = []
|
|
# Add a space at every point...
|
|
for i in range(1, len(w)):
|
|
result.append({'w1': w[:i], 'w2': w[i:], 'c1': c[:i], 'c2': c[i:]})
|
|
return result
|
|
|
|
|
|
def get_possible_splits(word, dictionary):
|
|
correct_splits = []
|
|
possible_splits = split_word(word)
|
|
for possible in possible_splits:
|
|
word1 = possible['w1']
|
|
word2 = possible['w2']
|
|
if is_in_dict(word1, dictionary) and is_in_dict(word2, dictionary):
|
|
correct_splits.append(possible)
|
|
return correct_splits
|
|
|
|
|
|
def test():
|
|
ocr_chars, ocr_confs = load_ocr_data(r'E:\Code\GIT\scorewalker-utils\KMeans\test_data\ocr_files\1.frt')
|
|
english_dict = load_dict(r'E:\Code\GIT\scorewalker-utils\KMeans\Data\WordLists\EnglishWords.txt')
|
|
max_dist = 3
|
|
|
|
words = split_into_words(ocr_chars, ocr_confs)
|
|
possible_bad_words = find_unconfident_words(words, 9)
|
|
|
|
num_corrected = 0
|
|
for i in range(len(words)):
|
|
replaced = False
|
|
word_data = words[i]
|
|
word = word_data['word']
|
|
conf = word_data['conf']
|
|
if is_word_confident(conf, 9):
|
|
continue
|
|
elif is_in_dict(word, english_dict):
|
|
conf = correct_confs(conf)
|
|
words.insert(i, {'word': word, 'conf': conf})
|
|
|
|
else:
|
|
splits = get_possible_splits(word_data)
|
|
if len(splits) == 1:
|
|
w1 = splits[0]['w1']
|
|
c1 = correct_confs(splits[0]['c1'])
|
|
w2 = splits[0]['w2']
|
|
c2 = correct_confs(splits[0]['c2'])
|
|
words.remove(word_data)
|
|
words.insert(i, {'word': w2, 'conf': c2})
|
|
words.insert(i, {'word': w1, 'conf': c1})
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test()
|