|
| 1 | +#!/usr/bin/env python |
| 2 | +# -*- mode: python; coding: utf-8 -*- |
| 3 | +from __future__ import print_function |
| 4 | +import os, logging, subprocess, time, re |
| 5 | +from datetime import datetime |
| 6 | +from collections import defaultdict |
| 7 | +import tempfile |
| 8 | + |
| 9 | +class G2PModelTester () : |
| 10 | + """G2P Model training wrapper class. |
| 11 | +
|
| 12 | + Phonetisaurus G2P modeling training wrapper class. |
| 13 | + This wraps the alignment, joint n-gram training, and ARPA to |
| 14 | + WFST conversion steps into one command. |
| 15 | + """ |
| 16 | + |
| 17 | + def __init__ (self, model, **kwargs) : |
| 18 | + self.model = model |
| 19 | + self.lexicon_file = kwargs.get ("lexicon", None) |
| 20 | + self.nbest = kwargs.get ("nbest", 1) |
| 21 | + self.thresh = kwargs.get ("thresh", 99) |
| 22 | + self.beam = kwargs.get ("beam", 10000) |
| 23 | + self.greedy = kwargs.get ("greedy", False) |
| 24 | + self.verbose = kwargs.get ("verbose", False) |
| 25 | + self.logger = self.setupLogger () |
| 26 | + |
| 27 | + def setupLogger (self) : |
| 28 | + """Setup the logger and logging level. |
| 29 | +
|
| 30 | + Setup the logger and logging level. We only support |
| 31 | + verbose and non-verbose mode. |
| 32 | +
|
| 33 | + Args: |
| 34 | + verbose (bool): Verbose mode, or not. |
| 35 | +
|
| 36 | + Returns: |
| 37 | + Logger: A configured logger instance. |
| 38 | + """ |
| 39 | + |
| 40 | + level = logging.DEBUG if self.verbose else logging.INFO |
| 41 | + logging.basicConfig ( |
| 42 | + level=level, |
| 43 | + format="\033[94m%(levelname)s:%(name)s:"\ |
| 44 | + "%(asctime)s\033[0m: %(message)s", |
| 45 | + datefmt="%Y-%m-%d %H:%M:%S" |
| 46 | + ) |
| 47 | + |
| 48 | + return logging.getLogger ("phonetisaurus-apply") |
| 49 | + |
| 50 | + def _loadLexicon (self) : |
| 51 | + """Load the lexicon from a file. |
| 52 | +
|
| 53 | + Load the reference lexicon from a file, and store it |
| 54 | + in a defaultdict (list). |
| 55 | + """ |
| 56 | + |
| 57 | + _lexicon = defaultdict (list) |
| 58 | + if not self.lexicon_file : |
| 59 | + return _lexicon |
| 60 | + |
| 61 | + self.logger.debug ("Loading lexicon from file...") |
| 62 | + with open (self.lexicon_file, "r") as ifp : |
| 63 | + for line in ifp : |
| 64 | + line = line.decode ("utf8").strip () |
| 65 | + word, pron = re.split (ur"\t", line) |
| 66 | + _lexicon [word].append (pron) |
| 67 | + |
| 68 | + return _lexicon |
| 69 | + |
| 70 | + def checkPhonetisaurusConfig (self) : |
| 71 | + """Run some basic checks before training. |
| 72 | +
|
| 73 | + Run some basic checks regarding the $PATH, environment, |
| 74 | + and provided data before starting training. |
| 75 | +
|
| 76 | + Raises: |
| 77 | + EnvironmentError: raised if binaries are not found. |
| 78 | + """ |
| 79 | + |
| 80 | + self.logger.debug ("Checking command configuration...") |
| 81 | + for program in ["phonetisaurus-g2pfst"] : |
| 82 | + if not self.which (program) : |
| 83 | + raise EnvironmentError, "Phonetisaurus command, '{0}', "\ |
| 84 | + "not found in path.".format (program) |
| 85 | + |
| 86 | + if self.lexicon_file and not os.path.exists (self.lexicon_file) : |
| 87 | + self.logger.error ("Could not find provided lexicon file.") |
| 88 | + sys.exit (1) |
| 89 | + |
| 90 | + for key,val in sorted (vars (self).iteritems ()) : |
| 91 | + self.logger.debug (u"{0}: {1}".format (key, val)) |
| 92 | + |
| 93 | + self.lexicon = self._loadLexicon () |
| 94 | + |
| 95 | + return |
| 96 | + |
| 97 | + def which (self, program) : |
| 98 | + """Basic 'which' implementation for python. |
| 99 | +
|
| 100 | + Basic 'which' implementation for python from stackoverflow: |
| 101 | + * https://stackoverflow.com/a/377028/6739158 |
| 102 | +
|
| 103 | + Args: |
| 104 | + program (str): The program name to search the $PATH for. |
| 105 | +
|
| 106 | + Returns: |
| 107 | + path/None: The path to the executable, or None. |
| 108 | + """ |
| 109 | + |
| 110 | + def is_exe (fpath) : |
| 111 | + return os.path.isfile (fpath) and os.access (fpath, os.X_OK) |
| 112 | + |
| 113 | + fpath, fname = os.path.split (program) |
| 114 | + if fpath: |
| 115 | + if is_exe (program): |
| 116 | + return program |
| 117 | + else: |
| 118 | + for path in os.environ["PATH"].split (os.pathsep) : |
| 119 | + path = path.strip ('"') |
| 120 | + exe_file = os.path.join (path, program) |
| 121 | + if is_exe (exe_file): |
| 122 | + return exe_file |
| 123 | + |
| 124 | + return None |
| 125 | + |
| 126 | + def makeG2PCommand (self, word_list) : |
| 127 | + """Build the G2P command. |
| 128 | +
|
| 129 | + Build the G2P command from the provided arguments. |
| 130 | +
|
| 131 | + Returns: |
| 132 | + list: The command in subprocess list format. |
| 133 | + """ |
| 134 | + |
| 135 | + command = [ |
| 136 | + u"phonetisaurus-g2pfst", |
| 137 | + u"--model={0}".format (self.model), |
| 138 | + u"--nbest={0}".format (self.nbest), |
| 139 | + u"--beam={0}".format (self.beam), |
| 140 | + u"--thresh={0}".format (self.thresh), |
| 141 | + u"--wordlist={0}".format (word_list) |
| 142 | + ] |
| 143 | + |
| 144 | + self.logger.debug (u" ".join (command)) |
| 145 | + |
| 146 | + return command |
| 147 | + |
| 148 | + def runG2PCommand (self, word_list_file) : |
| 149 | + """Generate and run the actual G2P command. |
| 150 | + |
| 151 | + Generate and run the actual G2P command. Each synthesized |
| 152 | + entry will be yielded back on-the-fly via the subprocess |
| 153 | + stdout readline method. |
| 154 | +
|
| 155 | + Args: |
| 156 | + word_list_file (str): The input word list. |
| 157 | + """ |
| 158 | + g2p_command = self.makeG2PCommand (word_list_file) |
| 159 | + |
| 160 | + self.logger.debug ("Applying G2P model...") |
| 161 | + |
| 162 | + with open (os.devnull, "w") as devnull : |
| 163 | + proc = subprocess.Popen ( |
| 164 | + g2p_command, |
| 165 | + stdout=subprocess.PIPE, |
| 166 | + stderr=devnull if not self.verbose else None |
| 167 | + ) |
| 168 | + |
| 169 | + for line in iter (proc.stdout.readline, "") : |
| 170 | + parts = re.split (ur"\t", line.decode ("utf8").strip ()) |
| 171 | + if not len (parts) == 3 : |
| 172 | + self.logger.warning ( |
| 173 | + u"No pronunciation for word: '{0}'".format (parts [0]) |
| 174 | + ) |
| 175 | + continue |
| 176 | + |
| 177 | + yield parts |
| 178 | + |
| 179 | + return |
| 180 | + |
| 181 | + def applyG2POnly (self, word_list_file) : |
| 182 | + """Apply the G2P model to a word list. |
| 183 | +
|
| 184 | + Apply the G2P model to a word list. No filtering or application |
| 185 | + of a reference lexicon is used here. |
| 186 | +
|
| 187 | + Args: |
| 188 | + word_list_file (str): The input word list. |
| 189 | + """ |
| 190 | + for word, score, pron in self.runG2PCommand (word_list_file) : |
| 191 | + line = u"" |
| 192 | + if self.verbose : |
| 193 | + line = u"{0}\t{1:.2f}\t{2}".format ( |
| 194 | + word, float (score), pron |
| 195 | + ) |
| 196 | + else : |
| 197 | + line = u"{0}\t{1}".format (word, pron) |
| 198 | + print (line.encode ("utf8")) |
| 199 | + |
| 200 | + return |
| 201 | + |
| 202 | + def applyG2PWithLexicon (self, word_list_file) : |
| 203 | + """Apply the G2P model to a word list, combined with lexicon. |
| 204 | +
|
| 205 | + Apply the G2P model to a word list, but combine this with |
| 206 | + a reference lexicon. Words for which a reference entry exists |
| 207 | + will not be sent to the G2P, unless the additional '--greedy' |
| 208 | + flag is set to True. |
| 209 | +
|
| 210 | + Args: |
| 211 | + word_list_file (str): The input word list. |
| 212 | + """ |
| 213 | + target_lexicon = defaultdict (list) |
| 214 | + tmpwordlist = tempfile.NamedTemporaryFile (delete=False) |
| 215 | + |
| 216 | + #First, find any words in the target list for which we already |
| 217 | + # have a canonical pronunciation in the reference lexicon. |
| 218 | + with open (word_list_file, "r") as ifp : |
| 219 | + for word in ifp : |
| 220 | + word = word.decode ("utf8").strip () |
| 221 | + if word in self.lexicon : |
| 222 | + target_lexicon [word] = [(0.0,pron) |
| 223 | + for pron in self.lexicon [word]] |
| 224 | + #In greedy mode we still send words to the G2P, even |
| 225 | + # if we have canonical entries in the reference lexicon. |
| 226 | + if self.greedy : |
| 227 | + print (word.encode ("utf8"), file=tmpwordlist) |
| 228 | + else : |
| 229 | + print (word.encode ("utf8"), file=tmpwordlist) |
| 230 | + tmpwordlist.close () |
| 231 | + |
| 232 | + #Second, iterate through the G2P output, and filter against |
| 233 | + # any possible duplicates previously found in the reference lexicon. |
| 234 | + for word, score, pron in self.runG2PCommand (tmpwordlist.name) : |
| 235 | + prons = set ([p for s,p in target_lexicon [word]]) |
| 236 | + if pron in prons : |
| 237 | + continue |
| 238 | + target_lexicon [word].append ((score, pron)) |
| 239 | + |
| 240 | + #Finally, sort everything that is left and print it. |
| 241 | + for word in sorted (target_lexicon.keys ()) : |
| 242 | + for score, pron in target_lexicon [word] : |
| 243 | + line = u"" |
| 244 | + if self.verbose : |
| 245 | + line = u"{0}\t{1:.2f}\t{2}".format ( |
| 246 | + word, float (score), pron |
| 247 | + ) |
| 248 | + else : |
| 249 | + line = u"{0}\t{1}".format (word, pron) |
| 250 | + print (line.encode ("utf8")) |
| 251 | + |
| 252 | + os.unlink (tmpwordlist.name) |
| 253 | + return |
| 254 | + |
| 255 | + def ApplyG2PModel (self, word_list_file) : |
| 256 | + """Apply the G2P model to a word list. |
| 257 | +
|
| 258 | + Apply the G2P model to a word list. |
| 259 | +
|
| 260 | + Args: |
| 261 | + word_list_file (str): The input word list. |
| 262 | + """ |
| 263 | + self.checkPhonetisaurusConfig () |
| 264 | + |
| 265 | + if not os.path.exists (word_list_file) \ |
| 266 | + or not os.path.isfile (word_list_file) : |
| 267 | + raise IOError, "Word list file not found." |
| 268 | + |
| 269 | + if len (self.lexicon) == 0 : |
| 270 | + self.applyG2POnly (word_list_file) |
| 271 | + else : |
| 272 | + self.applyG2PWithLexicon (word_list_file) |
| 273 | + |
| 274 | + return |
| 275 | + |
| 276 | +if __name__ == "__main__" : |
| 277 | + import sys, argparse |
| 278 | + |
| 279 | + example = "{0} --model train/model.fst --word test".format (sys.argv [0]) |
| 280 | + |
| 281 | + parser = argparse.ArgumentParser (description=example) |
| 282 | + parser.add_argument ("--model", "-m", help="Phonetisaurus G2P fst model.", |
| 283 | + required=True) |
| 284 | + parser.add_argument ("--lexicon", "-l", help="Optional reference lexicon.", |
| 285 | + required=False) |
| 286 | + parser.add_argument ("--nbest", "-n", help="Nbest highest order.", |
| 287 | + default=1, type=int) |
| 288 | + parser.add_argument ("--beam", "-b", help="Search 'beam'.", |
| 289 | + default=10000, type=int) |
| 290 | + parser.add_argument ("--thresh", "-t", help="Pruning threshold for n-best.", |
| 291 | + default=99.0, type=float) |
| 292 | + parser.add_argument ("--greedy", "-g", help="Use the G2P even if a " |
| 293 | + "reference lexicon has been provided.", default=False, |
| 294 | + action="store_true") |
| 295 | + parser.add_argument ("--word_list", "-wl", help="Input word or word list to apply " |
| 296 | + "G2P model to.", type=str) |
| 297 | + |
| 298 | + parser.add_argument ("--verbose", "-v", help="Verbose mode.", |
| 299 | + default=False, action="store_true") |
| 300 | + args = parser.parse_args () |
| 301 | + |
| 302 | + tester = G2PModelTester ( |
| 303 | + args.model, |
| 304 | + **{key:val for key,val in args.__dict__.iteritems () |
| 305 | + if not key in ["model","word_list"]} |
| 306 | + ) |
| 307 | + |
| 308 | + tester.ApplyG2PModel (args.word_list) |
0 commit comments