Abstract Base Class For In-Memory MDM/Lookup System in Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 |
import abc, logging, pathlib, collections, re, pprint import cx_Oracle, datetime, sys __version__ = '0.205b-MVP' __author__ = 'Noelle Milton Vega' class Lookup(object, metaclass=abc.ABCMeta): """ An abstract/base-class for lookup data and methods. FileLookup, DatabaseLookup classes, etc. implement this. DatabaseLookup implementation: https://www.prismalytics.io/2018/10/07/db-lookup-py """ LOGGER_NAME = 'LOOKUP' # Base-Class logger name. DFLT_LOOKUP_TBL_NAME_PRFX = 'lookup_' DFLT_SYNONYM_TBL_NAME = 'data_synonym' METADATA_TBL_NAME_SUFX = 'metadata' # E.g. The table name would then be, for example, 'lookup_metadata' DATETIME_FORMAT = '%Y-%m-%d_%Hh%Mm%Ss' ALERT_MSG_CLMNS = ('severity','dateTime','msg','tableName','col0','col1','col2') # For namedTuple type creation. # =============================================================================================== # Default METADATA that we initialize for all tables; and is overriden if specified in Online-DB. # =============================================================================================== METADATA_DEFN = (('key_punct',''), ('val_punct',''), ('col_punct','_'), ('tbl_punct','_')) # =============================================================================================== MetadataType = collections.namedtuple('Metadata', [x[0] for x in METADATA_DEFN], rename=False) DFLT_METADATA = MetadataType._make([x[1] for x in METADATA_DEFN]) CompiledRegex = collections.namedtuple('CompiledRegex', ['key','val','col','tbl'], rename=False) # =============================================================================================== def __init__(self, connection, lookup_tbl_name_prfx, synonym_tbl_name): self._connection = connection # A connection object: db connection; pathlib.Path; etc. self.logger = logging.getLogger(self.__class__.__name__) # -------------------------------------------------------------------------------------------------------- # IMPORTANT: BELOW, each call to load_lookup_tables() MUST reset/reinitialize the identifiers, as shown. # -------------------------------------------------------------------------------------------------------- self._tbl_regexs = {} # dict() of table-name to one 4-field namedTuple: CompiledRegex(key=, val=, col=, tbl=) self._data_dictionary = {} # dict() of table-name to list()-of-associated-column-names. self._lookup_data = {} # Top-level container for in-memory lookup-data tables. FUTURE: Create composite-class for this. self._tbl_shapes = {} # dict() of table-name to 2-tuple of (rows, columns). self._tbl_metadata = {} # dict() of Lookup-table names to single namedTuple containing that tables's meta-data. self._events = {'ERROR':[], 'WARN':[], 'INFO':[], 'DEBUG':[]} self._lookups_hashes = {} # A dict() of dict() for lookup speedup. # -------------------------------------------------------------------------------------------------------- # IMPORTANT: ABOVE, each call to load_lookup_tables() MUST reset/reinitialize the identifiers, as shown. # -------------------------------------------------------------------------------------------------------- # -------------------------------------------------------------------------------------------------------- self._Event = collections.namedtuple('Event', Lookup.ALERT_MSG_CLMNS, rename=False) self._lookup_tbl_name_prfx = lookup_tbl_name_prfx if lookup_tbl_name_prfx else Lookup.DFLT_LOOKUP_TBL_NAME_PRFX self._lookup_tbl_name_prfx = normalizer(self._lookup_tbl_name_prfx, sre=base_tc_SRE) self._synonym_tbl_name = synonym_tbl_name if synonym_tbl_name else Lookup.DFLT_SYNONYM_TBL_NAME self._synonym_tbl_name = normalizer(self._synonym_tbl_name, sre=base_tc_SRE) self._metadata_tbl_name = self._lookup_tbl_name_prfx + Lookup.METADATA_TBL_NAME_SUFX # -------------------------------------------------------------------------------------------------------- @abc.abstractclassmethod def load_lookup_tables(self, *args, **kwargs): """ Method to load all LOOKUP TABLES from a SPECIFIC DATA-STORE TYPE (most likey a directory/folder or a database). Returns a reference to data structure containing an in-memory representation of the lookup tables. """ pass @abc.abstractclassmethod def lookup_key(self, *args, **kwargs): """ Method to retrieve the full ROW for a supplied KEY. The ROW is returned as a collections.namedtuple object. """ pass @abc.abstractclassmethod def lookup_attribute(self, *args, **kwargs): """ Method to retrieve the full ROW for a supplied KEY. The ROW is returned as a collections.namedtuple object. """ pass @abc.abstractclassmethod def _get_data_dictionary(self, *args, **kwargs): """ Private method to retrieve the table-names and collection of associated column-names for the Lookup and Synonym tables. Results are stored in _data_dictionary.""" pass @abc.abstractclassmethod def __enter__(self,): """ Method called when an Object for this Class is instantiated using Python's with/as construct. """ return self @abc.abstractclassmethod def __exit__(self, exc_type, exc_value, traceback): """ Method called when the Python with/as block that instantiated this Object is exited. close() shutdown() and, in general, cleanup code goes here. """ pass def _write_event(self, event): """ event is an instance of an Event() namedTuple; and event[0] = 'DEBUG'|'INFO'|'WARN'|'ERROR' """ getattr(self.logger, event[0].lower())(str(event)) self._events[event[0].upper()].append(event) @property def events(self): """ 'events' is a Read/Only Class property to expose the 'self._events' events dictionary. It's Read/Only because we DO NOT define a corresponding setter (like this): ------------------------------- @events.setter def events(self, x) self._events = x ------------------------------- """ return self._events def __iter__(self): """ Return iterator to iterate over 2-tuples of (tableName,list-of-namedTuples)/(Rows) that constitute 'lookup_data'. """ return iter(self.lookup_data.items()) def __getitem__(self, tbl_name): """ Return a list-of-namedTuples/(Rows) for the tableName (index) specified. """ return self.lookup_data[tbl_name] @property def regexs(self): """ 'regexs' is a Read/Only Class property to expose the 'self._tbl_regexs' data structure. """ return self._tbl_regexs @property def metadata(self): """ 'metadata' is a Read/Only Class property to expose the 'self._tbl_metadata' data structure. """ return self._tbl_metadata @property def dde(self): """ 'dde' is a Read/Only Class property to expose the 'self._data_dictionary' data structure. """ return self._data_dictionary @property def tbl_shapes(self): """ 'tbl_shapes' is a Read/Only Class property to expose the 'self._tbl_shapes' data structure. """ return self._tbl_shapes @property def lookup_data(self): """ 'lookup_data' is a Read/Only Class property to expose the 'self._lookup_data' data structure. """ return self._lookup_data @staticmethod def _synonym_tbl_cleaner(L, rm_orphans=True): """ L is an instance of DatabaseLookup(). This function (1) Evicts ALL BUT ONE DUPLICATE SYNONYM Row()s matched (i.e. where all pair-wise, normalized attributes of the namedTuples match); and (2) Evicts ALL AMBIGUOUS SYNONYM Row(s) matched (i.e. where, pairwise, normalized attribute-0 and attribute, match). We MUST remove DUPS cases before AMBIGUOUS cases; and (3) We remove ORPHANED SYNONYMS. We use two Counter() dicts to count these "macthes": one for DUPS and one for AMBIG. The rm_orphans arguments allows the caller to choose whether to remove SYNONYM ORPHANS (default is True). Use cases for rm_orphans: Case1: Need cleansed lookup-data for the DataFactory transformer process: rm_orphans=True dedup=False Case2: Need non-cleansed lookup-data for the LOOKUP/SYNONYM populator (populator.py): rm_orphans=False dedup=True """ synonym_tbl_name = normalizer(L._synonym_tbl_name, sre=base_tc_SRE) # Must defensively .strip().lower() b/c multiple consumers of this. synonym_tbl_rows = L.lookup_data[synonym_tbl_name] # This is POINTER to a list() of Rows()/namedTuples. # ----------------------------------------------------------------------------------------- # Using Counter() dicts, this section: (1) records which Rows()s/namedTuples match end-to-end, # along with their respective count (this is the DEPUP case); and (2) records which # Row()s/namedTuple match along position 0 & 1, along with their respective count # (this is the AMBIGUOUS case). In both cases we don't actually store any Row()s/namedTuples, # (which would be Memory and CPU inefficient), but rather a string "representation" of them # by concatinating their in-common attributes. # SYNONYM_KEY LOOKUP_TABLE LOOKUP_KEY # foo LPK_CNTRY bar1 <---- Ambiguous case # foo LPK_CNTRY bar2 pair. (col0, col1) # blah LPK_GEND R wxyz <---- Duplicate case # blah LPK_GENDR wxyz pair. (col0, col2, col2) # ----------------------------------------------------------------------------------------- dupRow_cntrDict = collections.Counter() ambigRow_cntrDict = collections.Counter() for synonym_tbl_row in synonym_tbl_rows: synonym = normalizer(synonym_tbl_row[0], sre=L.regexs[synonym_tbl_name].key) lookback_tbl = normalizer(synonym_tbl_row[1],sre=base_tc_SRE) lookback_key = synonym_tbl_row[2] # CAN'T DO THIS YET: normalizer(synonym_tbl_row[2], sre=L.regexs[lookback_tbl].key) lookback_key = lookback_key if lookback_key != None else '' # This is what normalizer() does for the None case. ambigKey = ''.join([synonym, lookback_tbl]) dupKey = ambigKey + lookback_key # DUP cases are also AMBIG cases. # ----------------------------------------------------------------------------------------- # NOTE: ABOVE, we can't/don't normalize 'lookback_key' for two reasons: # ----------------------------------------------------------------------------------------- # (1) Though we have the lookback_tbl name, we don't YET know if it actually exists (i.e. an ORPHAN) # (e.g. LOOKUP_tGENDERr); so there's possibly no regex entry for it (and we'll get KeyError exception) # (2) I forgot the second reason at the moment. =:) To be revisited. # So we must store the 'lookup_key' component of the dict hashKey AS IS, and later (in the # rm_orphans section) we can table-appropriate normalize() and compare. The only thing we # can and do do, is manually defend against getting a Python 'None' back (which would normally # be taken care of in normalizer()). # ----------------------------------------------------------------------------------------- if not ambigRow_cntrDict[ambigKey]: # This also means it's 'not in dupRow_cntrDict[dupKey]' (b/c subset). dupRow_cntrDict[dupKey] = 1 # Initialize for 'synonym(value) + lookup_table(value) + lookup_key(value)' ambigRow_cntrDict[ambigKey] = 1 # Initialize for 'synonym(value) + lookup_table(value)' elif dupRow_cntrDict[dupKey]: dupRow_cntrDict[dupKey] += 1 # SuperSet else: ambigRow_cntrDict[ambigKey] += 1 # SubSet # ----------------------------------------------------------------------------------------- # POST-MVP NOTE: Because we're iterating through synonym_tbl_rows twice (here and below), # figure out a way to slip this logic into the loop below (i.e. discover & remove on the fly). # This is tricky b/c when 2nd. hit is identified, you have to (1) remember where the 1st. hit # was located, (2) then adjust for position change b/c pop()ing of items will have shifted it! # USE Technique I used in 'populator.py'. Works! # ----------------------------------------------------------------------------------------- # ----------------------------------------------------------------------------------------- # This loop to pop()s Rows()/namedTuples from the SYNONYM-table. NOTE: Because each pop() # mutates the 'synonym_tbl_row' List, we can't use it as our for/loop iterator exhaustion counter. # ----------------------------------------------------------------------------------------- indx = 0 # A counter that is updated ONLY when we DON'T pop() (remove) a List item. for _ in range(len(synonym_tbl_rows)): synonym_tbl_row = synonym_tbl_rows[indx] synonym = normalizer(synonym_tbl_row[0], sre=L.regexs[synonym_tbl_name].key) lookback_tbl = normalizer(synonym_tbl_row[1],sre=base_tc_SRE) lookback_key = synonym_tbl_row[2] # CAN'T DO THIS YET: normalizer(synonym_tbl_row[2], sre=L.regexs[lookback_tbl].key) lookback_key = lookback_key if lookback_key != None else '' # This is what normalizer() does for the None case. ambigKey = ''.join([synonym,lookback_tbl]) dupKey = ambigKey + lookback_key # DUP cases are also AMBIG cases. # ------------------------------------------------------------------------------------- # EVICT DUPLICATE/IDENTICAL-MATCHES SYNONYMS (evict ALL of them). # That is, where ESSENTIALLY namedTuple1[0:] == namedTuple2[0:]) is verified as True # by pair-wise comparison of their normalized (normalizer()) attributes. # We CANNOT simply DEUP all but one instance b/c in the Online-DB copy, # someone may edit the LOOKUP_KEY for one of DUPs (and not the rest); # which would make all corresponding records fall into the AMBIGUOUS CASE) causing # them to all be deleted on next reload (which is what we do for the AMBIGUOUS CASE). # Then we have no related records, and we may have used them before. # ------------------------------------------------------------------------------------- if dupRow_cntrDict[dupKey] > 1: synonym_tbl_rows.pop(indx) event = L._Event('WARN',now(),'EVICTED-SYN:(IDENTICAL-Matches: %d)'%dupRow_cntrDict[dupKey], synonym_tbl_name,*synonym_tbl_row[0:3]) L._write_event(event) # Slice out only SYNONYM_KEY,LOOKUP_TABLE,LOOKUP_LOOKUP_KEY. That is, [0:3]. Synonym entries have no attributes! =:) continue # ------------------------------------------------------------------------------------- # ------------------------------------------------------------------------------------- # EVICT AMBIGUOUS-MATCHES SYNONYMS (evict ALL of them). # That is, where ESSENTIALLY namedTuple1[0:2] == namedTuple2[0:2]) is verified as True # by pair-wise comparison of their normalized (normalizer()) attributes. # ------------------------------------------------------------------------------------- if ambigRow_cntrDict[ambigKey] > 1: synonym_tbl_rows.pop(indx) event = L._Event('WARN',now(),'EVICTED-SYN:(AMBIGUOUS-Matches: %d)' % ambigRow_cntrDict[ambigKey], synonym_tbl_name,*synonym_tbl_row[0:3]) L._write_event(event) # Slice out only SYNONYM_KEY,LOOKUP_TABLE,LOOKUP_LOOKUP_KEY. That is, [0:3]. Synonym entries have no attributes! =:) continue # ------------------------------------------------------------------------------------- # ------------------------------------------------------------------------------------- # EVICT SYNONYMS where SYNONYM_KEY (column0) == LOOKUP_KEY (column2); because that's # not a true SYNONYM (meaning, you'd expect that the value for column2 would be a # LOOKUP_KEY in the Lookup-table itself). # ------------------------------------------------------------------------------------- if synonym_tbl_row[0] == synonym_tbl_row[2]: synonym_tbl_rows.pop(indx) event = L._Event('WARN',now(),'EVICTED-SYN:(NOT-A-SYNONYM: K=V)',synonym_tbl_name,*synonym_tbl_row[0:3]) L._write_event(event) # Slice out only SYNONYM_KEY,LOOKUP_TABLE,LOOKUP_LOOKUP_KEY. That is, [0:3]. Synonym entries have no attributes! =:) continue # ------------------------------------------------------------------------------------- # ----------------------------------------------------------------------------------------- # Use cases for rm_orphans: # Case1: Need cleansed lookup-data for the DataFactory transformer process: rm_orphans=True # Case2: Need non-cleansed lookup-data for the LOOKUP/SYNONYM populator: rm_orphans=False # ----------------------------------------------------------------------------------------- if rm_orphans: # ----------------------------------------------------------------------------------------- # EVICT ORPHAN SYNONYMS from our in-Memory SYNONYM-table representation. This means # removing a SYNONYM-table Row when either (1) the LOOKUP_TABLE that it refers to does not # exist; or (2) it does exist, but then the LOOKUP_KEY that it refers to does not exist in # that LOOKUP_TABLE. # NOTE: We use L.pop() to remove these items. This means we need two counters: (1) A # for/loop Counter, '_', to ensure we process every Row in the SYNONYM-table (we do not # use this counter in the body of the for/loop); and (2) an index Counter, 'indx', to keep # track of our position in the List, since pop()ing items off will shift index numbering. # ----------------------------------------------------------------------------------------- # STEP1: Does the LOOKUP-Table referred to in this SYNONYM-Row exist? If no, pop() and continue. # Ex Synonym Row : Row(SYNONYM_KEY='USofA', LOOKUP_TABLE='LOOKUP_CURRENCY', LOOKUP_KEY='usa') # ----------------------------------------------------------------------------------------- lookback_tbl = normalizer(synonym_tbl_row[1],sre=base_tc_SRE) if lookback_tbl not in L.lookup_data: synonym_tbl_rows.pop(indx) # No, so remove it! event = L._Event('WARN',now(),'EVICTED-SYN:(ORPHANED-Table)',synonym_tbl_name,*synonym_tbl_row[0:3]) L._write_event(event) # Slice out only SYNONYM_KEY,LOOKUP_TABLE,LOOKUP_LOOKUP_KEY. That is, [0:3]. Synonym entries have no attributes! =:) continue # ------------------------------------------------------------------------------------- # STEP2: Yes that "lookback_tbl" exists. So, does the LOOKUP-Key referred to in this # SYNONYM-Row also exist? # ------------------------------------------------------------------------------------- lookback_key = normalizer(synonym_tbl_row[2],sre=L.regexs[lookback_tbl].key) for lookup_table_row in L.lookup_data[lookback_tbl]: lookup_tbl_key = normalizer(lookup_table_row[1],sre=L.regexs[lookback_tbl].key) if (lookup_tbl_key == lookback_key): break else: synonym_tbl_rows.pop(indx) # No, so remove it! event = L._Event('WARN',now(),'EVICTED-SYN:(ORPHANED-Key)',synonym_tbl_name,*synonym_tbl_row[0:3]) L._write_event(event) # Slice out only SYNONYM_KEY,LOOKUP_TABLE,LOOKUP_LOOKUP_KEY. That is, [0:3]. Synonym entries have no attributes! =:) continue # ------------------------------------------------------------------------------------- # ------------------------------------------------------------------------------------- indx += 1 # Whenever we don't pop() an item we (end up here and) increment this. # ------------------------------------------------------------------------------------- #pprint.pprint(dupRow_cntrDict.keys()) #pprint.pprint(ambigRow_cntrDict.keys()) #sys.exit(0) @staticmethod def _lookup_tbl_cleaner(L): """ L is an instance of DatabaseLookup(). - Case1: Same PK. E.g. CURRENCY TABLE: 71,usa,USA,[ATTRIBS,...]; 71,gbr,GBR,[ATTRIBS,...] - Case2: Same KEY. E.g. CURRENCY TABLE: 71,usa,USA,[ATTRIBS,...]; 81,usa,GBR,[ATTRIBS,...] We EVICT ALL hits in both cases for LOOKUP-tables b/c we don't have enough info to determine a prevailing winner; and we don't want to potentially use duplicate lookup data in TARGET-tables. The validator2 utility will identify this Out-Of-Band possibility. """ for lookup_tbl_name in L.lookup_data: # One iteration for each Lookup-table. lookup_tbl_name = normalizer(lookup_tbl_name,sre=base_tc_SRE) # Must defensively .strip().lower() b/c multiple consumers of synonym_tbl_name = normalizer(L._synonym_tbl_name,sre=base_tc_SRE) # this staticMethodnd (e.g. populator.py); and may not do that. if lookup_tbl_name == synonym_tbl_name: continue # Skip SYNONYM-table. lookup_tbl_rows = L.lookup_data[lookup_tbl_name] # ----------------------------------------------------------------------------------------- # Using a defaultdict(lambda: [None,None,False,False]), this section records: # (Case-1) True if this PK occurs multiple times across Row()s/namedTuples. (False otherwise). # (Case-2) True if this KEY occurs multiple times across Row()s/namedTuples (False otherwise). # (Case-3) True if this VALUE occurs multiple times across Row()s/namedTuples (False otherwise). # (Case-4) True if this KEY is also a VALUE in different Row()s/namedTuples. # (Case-4) True if this VALUE is also a KEY in different Row()s/namedTuples. # NOTE: # None = Quantity has never seen. # False = Quantity has been seen exactly once. # True = Quantity has been seen two or more times. # ----------------------------------------------------------------------------------------- flagDict = collections.defaultdict(lambda: [None,None,False,False]) #DupKEY,DupVAL,xrDupKeyVal for lookup_tbl_row in lookup_tbl_rows: pk = 'pk_' + normalizer(lookup_tbl_row[0],sre=base_kv_SRE) # Prepend w/ 'pk_' to avoid possible hash-collision w/ numeric vals for col1,2. pk = 'pk_0' if pk == 'pk_' else pk # For a PKs, whitespace/aVoid/0 (the DB default for col0) are equivalent. Invalid, but still DUPS. key = normalizer(lookup_tbl_row[1],sre=L.regexs[lookup_tbl_name].key) val = normalizer(lookup_tbl_row[2],sre=L.regexs[lookup_tbl_name].val) if flagDict.get(key): # Don't inadvertantly create defaultDict entry in next statement (which simply wants to inspect). if flagDict[key][1] != None: flagDict[key][2] = True # Record if this Row()'s KEY is same as another Row()'s VALUE. if flagDict.get(val): # Don't inadvertantly create defaultDict entry in next statement (which simply wants to inspect). if flagDict[val][0] != None: flagDict[val][3] = True # Record if this Row()'s VALUE is same as another Row()'s KEY. flagDict[pk] = False if (flagDict[pk] == [None,None,False,False]) else True # Record if this PK duplicated. flagDict[key][0] = False if (flagDict[key][0] == None) else True # Record if this KEY duplicated. flagDict[val][1] = False if (flagDict[val][1] == None) else True # Record if this VALUE duplicated. # ----------------------------------------------------------------------------------------- # POST-MVP NOTE: Because we're iterating through lookup_tbl_rows twice (here and below), # figure out a way to slip this logic into the loop below (i.e. discover & remove on the fly). # This is tricky b/c when 2nd. hit is identified, you have to (1) remember where the 1st. hit # was located, (2) then adjust for position change b/c pop()ing of items will have shifted it! # ----------------------------------------------------------------------------------------- # ----------------------------------------------------------------------------------------- # This loop to pop()s Rows()/namedTuples from the LOOKUP-table. NOTE: Because each pop() # mutates the 'lookup_tbl_row' List, we can't use it as our for/loop iterator exhaustion counter. # ----------------------------------------------------------------------------------------- indx = 0 # A counter that is updated ONLY when we DON'T pop() (remove) a List item. for _ in range(len(lookup_tbl_rows)): lookup_tbl_row = lookup_tbl_rows[indx] # ------------------------------------------------------------------------------------- # If there are multi-occurrances of any() kind, we EVICT the Row/Record, and indicate # -- in the message string -- each of the offending categories (i.e. where dups occurred). # NOTE: For PKs, our output status is either 'noPK = True' (indicating PK = whiteSpace; 0; None) # -OR- 'dupPK = True/False' (indicating PK w/ a valid numeral, and whether or not it hss DUPs). # ------------------------------------------------------------------------------------- pk = 'pk_' + normalizer(lookup_tbl_row[0],sre=base_kv_SRE) # Prepend w/ 'pk_' to avoid possible hash-collision w/ numeric vals for col1,2. pk = 'pk_0' if pk == 'pk_' else pk # For a PKs, whitespace/aVoid/0 (the DB default for col0) are equivalent. Invalid, but still DUPS. key = normalizer(lookup_tbl_row[1],sre=L.regexs[lookup_tbl_name].key) val = normalizer(lookup_tbl_row[2],sre=L.regexs[lookup_tbl_name].val) if any((pk == '_0',flagDict[pk],flagDict[key][0],flagDict[val][1],flagDict[key][2],flagDict[val][3])): lookup_tbl_rows.pop(indx) msg = 'EVICTED-LKP:(' (statPKstr,statPKval) = ('noPK',True) if (pk == 'pk_0') else ('dupPK',flagDict[pk]) z = zip((statPKstr,'dupKEY','dupVAL','xrDupK/V','xrDupV/K'), (statPKval,flagDict[key][0],flagDict[val][1],flagDict[key][2],flagDict[val][3])) msg += '|'.join([failureType for (failureType,flag) in z if flag]) + ')' event = L._Event('WARN',now(),msg,lookup_tbl_name,*lookup_tbl_row[0:3]) L._write_event(event) # Slice out only PK,LOOKUP_KEY,LOOKUP_VALUE. That is, [0:3]. Take no attributes! continue # ------------------------------------------------------------------------------------- # ------------------------------------------------------------------------------------- indx += 1 # Whenever we don't pop() an item we (end up here and) increment this. # ------------------------------------------------------------------------------------- @staticmethod def _tbl_deduper(lkp_and_syn_dataStruct, synonym_tbl_name=None): """ Utility to DEDUP a candidate Row()s/records in a table-TO-list-of-Row()s data structure. This utility is used by 'POPULATOR.PY' (and !NOT! by the Lookup API) to dedup new/candidate Lookup-table and Synonym-table Row() entries, (derived from governed/annotated CSVs). For Lookup-table candidates in particular, 'populator.py' assigns unique PK numbers to each Row() in the list. This is so we don't have to treat Lookup-tables and Synonym-tables differently in the following code (b/c PKs will be unique between candidate Row()s in a Lookup-table and Synonym-tables don't even have them at all). The call signature includes the Synonym-table name because we want to know when, during iterations below, when we are NOT working on a Syhonym-table; b/c here we CAN (and should) DEDUP Lookup-table Row()s based only on Row() LOOKUP_KEY and LOOKUP_VALUE (but not Row() PK). IMPORTANT: This method MUTATES the data structure passed to in, in-place. It doesn't return a different object.""" # =============================================================================================== # DEDUP duplicate rows in each table. Note that our lookup API doesn't not perform DEDUPLICATION # b/c DEUPING our In-Memory copy (and not the Online-DB as well) is the wrong thing to do! This # is used by populator.py. # =============================================================================================== # Using Counter() dicts, this section records when it has seen a Row()/namedTuple. The # next time(s) it sees that same Row(), it will remove it/them (i.e. DEDUPs all but one # copy). NOTE: We don't actually store any Row()s/namedTuples, (which would be Memory # and CPU inefficient), but rather a string "representation" of them by concatinating # their in-common attributes. # =============================================================================================== Event = collections.namedtuple('Event', Lookup.ALERT_MSG_CLMNS, rename=False) cntrDict = collections.Counter() deduped_lookup_data = collections.defaultdict(set) # To log DEDUPED Lookup/Synonym Events(). # If a particular DUP is repeated 1m times, we only need to see one entry for it (hence a set()). for tbl_name in lkp_and_syn_dataStruct: # One loop/iteration for each table. indx = 0 # A counter that is updated ONLY when we DON'T pop() (remove) a List item. tbl_name = normalizer(tbl_name,sre=base_tc_SRE) # Must defensively .strip().lower() b/c multiple consumers of synonym_tbl_name = normalizer(synonym_tbl_name,sre=base_tc_SRE) # this staticMethodnd (e.g. populator.py); and may not do that. tbl_rows = lkp_and_syn_dataStruct[tbl_name] sliceObj = slice(0,None,1) if (tbl_name == synonym_tbl_name) else slice(1,None,1) # If working w/ Synonym-table, use all Row() attributes to create a unique dict() hash. # If working w/ Lookup-table, use all Row() attributes to create a unique dict() hash, EXCEPT the PK attribute. for _ in range(len(tbl_rows)): # DEDUP Row()s in the current table/iteration. tbl_row = tbl_rows[indx] hashKey = ''.join([r if (r != None) else '' for r in tbl_row[sliceObj]]) # HashKey = table-type (LKP vs SYN) concat of normalized Row() slice. # -------------------------------------------------------------------------------------- # Notice ABOVE that, when concatinating row-values to create a HashKey, we DON'T normalize() # each component (i.e. we don't do normalizer(r) in the list-comp). That's b/c we don't # know if this table PERMITS special characters for keys and/or values. This is a # standalone @staticmethod used by POPULATOR.PY, and doesn't have a Lookup Object to know # that information. It's a table DEDUPER and uses the literal strings it finds. # We do, however, (in the list-comp) manually normalize Python None's to '' (just as # normalizer() would for that case). # -------------------------------------------------------------------------------------- if not cntrDict[hashKey]: # First time we've seen this Row(). Keep this, but pop() every next occurrance. cntrDict[hashKey] = 1 else: # We've seen this Row() before, so pop() every next occurrance. msg = 'DEDUPED populator Row' event = Event('WARN',now(),msg,tbl_name,*tbl_row[0:3]) # Slice out only Col0,Col1,Col2 of Row() and nothing else. That is, [0:3]. deduped_lookup_data[tbl_name].add(event) # Add event to the set(). tbl_rows.pop(indx) continue indx += 1 # Whenever we don't pop() an item we (end up here and) increment this. return deduped_lookup_data # This data-struct is merged w/ rejected_lookup_data in populator.py (the caller). # =============================================================================================== @staticmethod def _get_next_PKs(L, data_tbl_api_object_from_MGREENE=None): """ L is an instance of DatabaseLookup(). Used by POPULATOR.PY. Used here for MVP. Post-MVP, Malcolm will provide an API interface to the Data-tables; including which of it's columns are lookup columns. """ PK_SAFETY_PADDING = 10 # Just in case some PKs at the tail-end were manually deleted in the Lookup-tables. synonym_tbl_name = normalizer(L._synonym_tbl_name,sre=base_tc_SRE) next_pk_dict = {} def max_keyFunc(row): """" key function for max() to normalize a Row() whose PK attribute-value wasn't an int (including it being a whitespace|void|None), to PK = 0. """ if isinstance(row[0], int): return row # Return unmodified namedTuple. return row._replace(**{row._fields[0] : 0}) # Return new namedTuple with adjusted PK value. # We can use max() below b/c rowList is an iterable consisting of [Row(int, str, str),...] for tblName,rowList in L.lookup_data.items(): tblName = normalizer(tblName,sre=base_tc_SRE) if tblName == synonym_tbl_name: continue # The Synonym-table doesn't have PKs. next_pk_dict[tblName] = max(rowList, key=max_keyFunc)[0] if len(rowList) > 1 else 2 # Always leave room for the '1,null,Null' entry in Lookup-tables. next_pk_dict[tblName] += PK_SAFETY_PADDING return next_pk_dict @staticmethod def _load_table_metadata(L): """ L is an instance of DatabaseLookup object. Some object attributes will be mutated intentionally here. """ # --------------------------------------------------------------------------- # First, remove/pop() the Metadata-table form L._lookup_data and # L._data_dictionary. We don't want to see it there anymore! From # L._lookup_data we save pop()ed Rows, if any, for subsequent processing. # --------------------------------------------------------------------------- rowList = L._lookup_data.pop(L._metadata_tbl_name,None) # One row per table. L._data_dictionary.pop(L._metadata_tbl_name,None) # One row per table. # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # Next, we provide each Lookup/Synonym-table with initial, DEFAULT METADATA # information; and overwrite those that have (well-formed) METADATA entries in # the Online-DB with that information. This ensures that every table has # associated METADATA, w/ reasonable defaults if not is defined online. # --------------------------------------------------------------------------- L._tbl_metadata = {tbl_name:Lookup.DFLT_METADATA for tbl_name in L.dde} msg = 'METADATA:DEFAULTS INSTALLED' event = L._Event('DEBUG',now(),msg,*tuple([None]*4)) L._write_event(event) # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # A METADATA table may not exist or be empty in this particular Lookup/Synonym # schema/DB (i.e. if, below, rowList is None or is an empty-List). If so, we # return, but having already initialized METADATA DEFAULTS (above). # --------------------------------------------------------------------------- if not rowList: return # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # At this point we know rowList isn't empty. So we take one Row (the first # one), extract it's namedTuple fields (minus the table-name field), and make # sure that it's fieldNames and ordering (i.e. column names and ordering in the # Online-DB) match that of Lookup.MetadataType. If not, raise an exception and # exit(1). NOTE: We'll tolerate 'extra' columns to the RIGHT of the 'required' # columns (which may, for example, be introduced during platform enhancements); # and will slice those out to ignore them. But everything to the left must be # one-to-one perfect match! This is to enhance data integrity. # --------------------------------------------------------------------------- nt_fields = Lookup.MetadataType._fields # 'nt' stands for NamedTuple. =:) nt_fields = [normalizer(f, sre=base_tc_SRE) for f in nt_fields] db_fields = rowList[0]._fields[1:len(nt_fields)+1] # Igore table-name field0; then extract required subset from remaining. db_fields = [normalizer(f, sre=base_tc_SRE) for f in db_fields] if (len(db_fields) < len(nt_fields)) or (db_fields != nt_fields): print('ERROR: Mismatch between metadata field-names and/or field-ordering. REQUIRED: %s. FOUND: %s.' % (str(nt_fields),str(db_fields)) ) raise TypeError # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # Per-table, replace DEFAULT METADATA with what's in Online-DB METADATA entry for it. # --------------------------------------------------------------------------- for row in rowList: # At this point we know rowList isn't empty. tbl_name = normalizer(row[0],sre=base_tc_SRE) # ----------------------------------------------------------------------- # DO NOT normalizer() the above using metadata-derived REGEX for these # table names; b/c we're actually building that REGEX information right # now! CIRCULAR LOGIC! And we don't have to anyway. =:) The # sre=base_tc_SRE REGEX is perfect! # ----------------------------------------------------------------------- # ----------------------------------------------------------------------- # Skip and alert on table-names in Metadata-table that don't exist. # ----------------------------------------------------------------------- if tbl_name not in L.dde: msg = 'METADATA:ORPHANED TABLE IGNORED' event = L._Event('WARN',now(),msg,tbl_name,*tuple([None]*3)) L._write_event(event) continue # ----------------------------------------------------------------------- # ----------------------------------------------------------------------- # At this point we know that each Row has the required fields (both field # names and their ordering). So for each Lookup/Synonym-table that has # an entry defined in the Online-DB Metadata-table, we REPLACE it's DEFAULT # METADATA (set above) with that Online-DB-specified metadata. So after # slicing-out the table name column (i.e. row[:1]), we take the resulting # Tuple and create an instance of our MetadataType nameTuple, and insert # that into the dictionary entry for that table. But we do one better: # Should a particular metadata field (in the Online-DB metadata) be None # (i.e. Python 'None' / a void in that cell), then we plug in our # default value for THAT specific field (stored in Lookup.DFLT_METADATA). # This ensures consistent and safe default befavior when someone didn't # fill in a value for a particular field (say in Toad). By safe we mean # that the ultimate/macro result will be that keys & values will -- by # default -- permit ONLY characters from the class: [a-z0-9]; while # table-names and column-names will -- by default -- permit ONLY # characters from the class: [a-z0-9_]. This is proper and correct for # SEMANTIC DATA TREATMENT! The list-comp below (aList) does this for us. # # Finally, notice that we DO NOT normalize() field values b/c, by # INTENTION, field values likely have non alphaNumeric characters. One # use-case, for example, is per-table 'key_punct' metadata that specifies # non-alphaNumeric symbols allowed in its keys. # ----------------------------------------------------------------------- # L._tbl_metadata[tbl_name] = Lookup.MetadataType._make(aList) # Yields same as below. # ----------------------------------------------------------------------- aList = [val if (val!=None) else Lookup.DFLT_METADATA[e] for e,val in enumerate(row[1:])] L._tbl_metadata[tbl_name] = Lookup.MetadataType(*aList) msg = 'METADATA:DEFAULT OVERRIDDEN: %s' % str(row) event = L._Event('DEBUG',now(),msg,tbl_name,*tuple([None]*3)) L._write_event(event) # ----------------------------------------------------------------------- @staticmethod def _create_tbl_norm_regexs(L): """ L is an instance of DatabaseLookup object. Some object attributes will be mutated intentionally here. """ # --------------------------------------------------------------------------- # Using the per-table METADATA (in LOOKUP_METADATA) which, among other things, # specifies punctuation chars (in addition to the base alphaNumeric chars) to # ALLOW in Lookup/Synonym table KEYS, table VALUES, the table NAME and table # COLUMN NAMES. NOTE: In reality, we won't ever use additional punctuations # specified for table NAME and table COLUMN NAMES (we just defined them for # completeness =:)). But the use-case for ALLOWING special/punctuation chars # (in addition to alphaNumeric chars), is to support composite KEYS (keys # composed of two or more CSV column names). E.g. country_Value:seller_Value # --------------------------------------------------------------------------- for tbl_name,row in L.metadata.items(): key_SRE = SRE_func(row.key_punct) val_SRE = SRE_func(row.val_punct) col_SRE = SRE_func(row.col_punct) tbl_SRE = SRE_func(row.tbl_punct) L._tbl_regexs[tbl_name] = Lookup.CompiledRegex(key_SRE,val_SRE,col_SRE,tbl_SRE) # --------------------------------------------------------------------------- @staticmethod def _build_lookups_hashes(L): for tblName,rowList in sorted(L.lookup_data.items()): # ----------------------------------------------------------------------------------------- # Build self._lookups_hash(es) lookup hashes (a dict() of dict()s). It looks like this: # ----------------------------------------------------------------------------------------- # { tblName1 : {hashKey1:1, hashKey2:2, hashKey3:3, ..., hashKeyX:X}, # tblName2 : {hashKey1:1, hashKey2:2, hashKey3:3, ..., hashKeyY:Y}, ... # tblNameN : {hashKey1:1, hashKey2:2, hashKey3:3, ..., hashKeyZ:Z} } # ----------------------------------------------------------------------------------------- L._lookups_hashes[tblName] = {} if tblName == L._synonym_tbl_name: for (e,row) in enumerate(rowList): synonym = normalizer(row[0],sre=L.regexs[L._synonym_tbl_name].key) # Normalized synonym lookback_tbl = normalizer(row[1],sre=base_tc_SRE) # Normalized lookback_table hashKey = synonym + ':' + lookback_tbl L._lookups_hashes[tblName].update({hashKey:e}) else: for (e,row) in enumerate(rowList): hashKey = normalizer(row[1],sre=L.regexs[tblName].key) L._lookups_hashes[tblName].update({hashKey:e}) # ----------------------------------------------------------------------------------------- # ----------------------------------------------------------------------------------------- # Clean (i.e. DE-DUP; REMOVE ORPHANED SYNONYMS; REMOVE AMBIGUOUS-MATCH SYNONYM) in # LOOKUP-tables and SYNONYM-table. IMPORTANT: lookup_tbl_cleaner() must be cleaned first, # because properly cleansing of the synonym_tbl_cleaner() depends on it. Because cleansing # alters the Lookup/Synonym-tables, we must also rebuild the lookups_hashes. # ----------------------------------------------------------------------------------------- @staticmethod def _cleanse_and_build_lookups_hashes(L,rm_orphans=True): Lookup._lookup_tbl_cleaner(L) # MUST BE RUN FIRST! Lookup._synonym_tbl_cleaner(L,rm_orphans=rm_orphans) # SECOND! Lookup._build_lookups_hashes(L) # Now rebuild the lookups_hashes. THIRD! # ----------------------------------------------------------------------------------------- # ============================================================================================================= # There are only three (qty. 3) database queries that the DatabaseLookup Class implementation will need to use: # dde: A query to retrieve a dictionary/hash of table-names to associated collection of column-names. # load: Using the dde results, a query to load all tables into an in-memory Python data structure. # store: Using the dde results, a query to store all in-memory modified tables back to the database. # We define the query statements here (CONSTANTS) on a per-Database-type, and save them in for easy reference # by the DatabaseLookup subclass, as a dict() of named-tuples. # ============================================================================================================= _oracle_store = None _oracle_conn_type = cx_Oracle.Connection _oracle_load = 'SELECT :arg1 FROM' _oracle_dde = """SELECT table_name, ltrim(sys_connect_by_path(column_name,','),',') column_names FROM (SELECT table_name, column_name, row_number() OVER (PARTITION BY table_name ORDER BY column_id) rn FROM user_tab_columns WHERE (table_name like :arg1) OR table_name = :arg2) WHERE connect_by_isleaf = 1 START WITH rn=1 CONNECT BY PRIOR rn=rn-1 AND PRIOR table_name = table_name """ _postgres_conn_type = None _postgres_dde = None _postgres_load = None _postgres_store = None _mysql_conn_type = None _mysql_dde = None _mysql_load = None _mysql_store = None _sqlite3_conn_type = None _sqlite3_dde = None _sqlite3_load = None _sqlite3_store = None _DB_CONSTANTS = collections.namedtuple('_DB_CONSTANTS', ['load_query', 'store_query', 'dde_query', 'conn_type'], rename=False) DB_CONSTANTS = {'oracle' : _DB_CONSTANTS(_oracle_load, _oracle_store, _oracle_dde, _oracle_conn_type), 'postgres' : _DB_CONSTANTS(_postgres_load, _postgres_store, _postgres_dde, _postgres_conn_type), 'sqlite3' : _DB_CONSTANTS(_sqlite3_load, _sqlite3_store, _sqlite3_dde, _sqlite3_conn_type), 'mysql' : _DB_CONSTANTS(_mysql_load, _mysql_store, _mysql_dde, _mysql_conn_type)} # ============================================================================================================= # =============================================================================================== # Top-Level function to return current UTC date/time. # =============================================================================================== def now(): """ Static method to return current UTC date/time. """ return datetime.datetime.utcnow().strftime(Lookup.DATETIME_FORMAT) # =============================================================================================== # =============================================================================================== # Top-Level function to consistently normalize strings, DataFactory Lookup-API wide. # =============================================================================================== base_pattern = '^\w' # Matches NON Unicode-word chars (basically, everything except: [a-zA-Z0-9_]). SRE_func = lambda pattern='', base_pattern=base_pattern: re.compile('[%s]+' % (base_pattern + pattern,)) base_kv_SRE = SRE_func() # Base/Default SREgex for KEYS & VALS. Overridden by METADATA. base_tc_SRE = SRE_func('_') # Base/Default SREgex for TBLS & COLS. Overridden by METADATA (though very improbably). def normalizer(s, preserve_case=False, sre=base_kv_SRE) -> str: """ Function to consistently normalize strings in the DataFactory. Passing it a string and leaving all arguments to default values, removes everything from the string EXCEPT alphaNum chars; and then lower-cases the result (case-1 below). This is the base case. You then ALLOW any 'special' characters as necessary by using the SRE_func lambda function (see examples that follow). Examples: 1) normalizer(s) Removes everything EXCEPT alphaNum chars; and lower-cases the result. 2) my_SRE = SRE_func('_'); normalizer(s, my_SRE) Same as previous, but now keeps/allows underscores. 3) my_SRE = SRE_func('_'); normalizer(s, preserve_case=True, my_SRE) Same as previous, but now preserves case. 4) my_SRE = SRE_func('#_'); normalizer(s, preserve_case=True, my_SRE) Same as previous, but now also keeps/allows the octothorpe symbol. """ if s == None: s = '' # Treat Python 'None's as equivalent to a zero-length str. s = str(s) if preserve_case else str(s).lower() s1 = sre.sub('', s, count=0) return s1 if ('_' in sre.pattern) else s1.replace('_', '') # =============================================================================================== |