"""
Non-database related utility functions for TICCLAT.
"""
import logging
import os
import tempfile
import warnings
import json
import time
import copy
import re
import numpy as np
import pandas as pd
import sh
LOGGER = logging.getLogger(__name__)
[docs]def anahash_df(wfreq, alphabet_file):
"""Get anahash values for word frequency data.
The result can be used to add anahash values to the database
(ticclat.dbutils.bulk_add_anahashes) and connect wordforms to anahash
values (ticclat.dbutils.connect_anahases_to_wordforms).
Inputs:
wfreq (pandas DataFrame): Dataframe containing word frequency data (the
result of
ticcl.dbutils.get_word_frequency_df)
alphabet_file (str): path to the ticcl alphabet file to use
Returns:
pandas DataFrame containing the word forms as index and anahash values
as column.
"""
LOGGER.info('Running TICCL-anahash.')
if wfreq.empty or wfreq is None:
msg = 'Input "wfreq" is empty or None. Please input non-empty word ' \
'frequency data.'
warnings.warn(msg)
# save word frequency data to temporary file
(file_descriptor, tmpfile) = tempfile.mkstemp()
os.close(file_descriptor)
wfreq.to_csv(tmpfile, sep='\t', header=False)
# run ticcl using sh
try:
sh.TICCL_anahash(['--list', '--alph', alphabet_file, tmpfile])
except sh.ErrorReturnCode as exception:
raise ValueError('Running TICCL-anahash failed: {}'.format(exception.stdout))
# read anahashes and return dataframe
anahashes = pd.read_csv('{}.list'.format(tmpfile), sep='\t', header=None,
names=['anahash'], index_col=0,
# Make sure 'null' is read as string and not NaN
keep_default_na=False)
return anahashes
[docs]def chunk_df(df, batch_size=1000):
"""Generator that returns about equally size chunks from a pandas DataFrame
Inputs:
df (DataFrame): the DataFrame to be chunked
batch_size (int, default 10000): the approximate number of records that will
be in each chunk
"""
if df.shape[0] > batch_size:
num_sections = df.shape[0] // batch_size
else:
num_sections = 1
for chunk in np.array_split(df, num_sections):
yield chunk
[docs]def write_json_lines(file_handle, generator):
"""Write a sequence of dictionaries to file, one dictionary per line
This can be used when doing mass inserts (i.e., inserts not using the ORM)
into the database. The data that will be inserted is written to file, so
it can be read (using ``read_json_lines``) without using a lot of memory.
Inputs:
file_handle: File handle of the file to save the data to
generator (generator): Generator that produces objects to write to file
Returns:
int: the number of records written.
"""
total = 0
for obj in generator:
file_handle.write(json_line(obj))
total += 1
return total
[docs]def json_line(obj):
"""Convert an object `obj` to a string containing a line of JSON."""
return f'{json.dumps(obj)}\n'
[docs]def count_lines(file_handle):
"""From https://stackoverflow.com/q/845058/1199693"""
file_handle.seek(0)
i = 0
for i, _ in enumerate(file_handle):
pass
return i + 1
[docs]def read_json_lines(file_handle):
"""Generator that reads a dictionary per line from a file
This can be used when doing mass inserts (i.e., inserts not using the ORM)
into the database. The data that will be inserted is written to file (using
``write_json_lines``), so it can be read and inserted into the database
without using a lot of memory.
Inputs:
file_handle: File handle of the file containing the data, one dictionary
(JSON) object per line
Returns:
iterator over the lines in the input file
"""
file_handle.seek(0)
for line in file_handle:
yield json.loads(line)
[docs]def chunk_json_lines(file_handle, batch_size=1000):
"""Read a JSON file and yield lines in batches."""
res = []
i = 0
for obj in read_json_lines(file_handle):
res.append(obj)
i += 1
if i == batch_size:
yield res
res = []
i = 0
if res != []:
yield res
[docs]def get_temp_file():
"""Create a temporary file and its file handle.
Returns:
File handle of the temporary file.
"""
file_handle = tempfile.TemporaryFile(mode='w+')
return file_handle
[docs]def iterate_wf(lst):
"""Generator that yields `{'wordform': value}` for all values in `lst`."""
for wordform in lst:
yield {'wordform': wordform}
[docs]def split_component_code(code, wordform):
"""
Split morphological paradigm code into its components.
Morphological paradigm codes in Reynaert's encoding scheme consist of 8
subcomponents. These are returned as separate entries of a dictionary from
this function.
"""
regex = r'Z(?P<Z>\d{4})Y(?P<Y>\d{4})X(?P<X>\d{4})W(?P<W>\d{8})V(?P<V>\d{4})_(?P<wt_code>\w{3})(?P<wt_num>\d{3})?'
match = re.search(regex, code)
if match:
wt_num = None
if match.group('wt_num'):
wt_num = int(match.group('wt_num'))
return {'Z': int(match.group('Z')),
'Y': int(match.group('Y')),
'X': int(match.group('X')),
'W': int(match.group('W')),
'V': int(match.group('V')),
'word_type_code': match.group('wt_code'),
'word_type_number': wt_num,
'wordform': wordform}
return None
[docs]def morph_iterator(morph_paradigms_per_wordform, mapping):
"""
Generator that yields dicts of morphological paradigm code components plus wordform_id in the database.
Inputs:
morph_paradigms_per_wordform: dictionary with wordforms (keys) and
lists (values) of dictionaries of code
components (return values of
`split_component_code`).
mapping: iterable of named tuples / dictionaries that contain the
result of a query on the wordforms table, i.e. fields
'wordform' and 'wordform_id'.
"""
for wordform in mapping:
for code in morph_paradigms_per_wordform[wordform['wordform']]:
if code is not None: # ignore incomplete codes for now
code_copy = copy.copy(code)
code_copy['wordform_id'] = wordform['wordform_id']
# we don't need the wordform
del code_copy['wordform']
yield code_copy
[docs]def set_logger(level='INFO'):
"""Configure logging format and level."""
logging.basicConfig(format="%(asctime)s [%(process)d] %(levelname)-8s "
"%(name)s,%(lineno)s\t%(message)s")
logging.getLogger().setLevel(level)
[docs]def timeit(method):
"""Decorator for timing methods.
Can be used for benchmarking queries.
Source: https://medium.com/pythonhive/fa04cb6bb36d
"""
def timed(*args, **kw):
time_start = time.time()
result = method(*args, **kw)
time_end = time.time()
if 'log_time' in kw:
name = kw.get('log_name', method.__name__.upper())
kw['log_time'][name] = int((time_end - time_start) * 1000)
else:
LOGGER.info('{} took {:.2f} ms'.format(method.__name__,
(time_end - time_start) * 1000))
return result
return timed
[docs]def read_ticcl_variants_file(fname):
"""Return dataframe containing data in TICCL variants file."""
df = pd.read_csv(fname, sep='#', header=None, engine='python')
df.columns = ['ocr_variant', 'corpus_frequency', 'correction_candidate',
'?1', 'ld', '?2', 'anahash']
return df