Source code for itemset_mining.two_phase_huim

from collections import defaultdict, namedtuple
from itertools import chain, combinations, tee
from types import GeneratorType
from typing import Any, Dict, Generator, List, Optional, Set, Tuple, Union

TupleOfStrAndNum = Tuple[str, Union[int, float]]

HUIRecord = namedtuple(  # pylint: disable=C0103
    "HUIRecord", ("items", "itemset_utility")
)


CandidateHUIRecord = namedtuple("HUIRecord", ("items", "twu"))  # pylint: disable=C0103


[docs]class TwoPhase: """ Example: >>> from itemset_mining.two_phase_huim import TwoPhase >>> from operator import attrgetter >>> transactions = [ ... [("Coke 12oz", 6), ("Chips", 2), ("Dip", 1)], ... [("Coke 12oz", 1)], ... [("Coke 12oz", 2), ("Chips", 1)], ... [("Chips", 1)], ... [("Chips", 2)], ... [("Coke 12oz", 6), ("Chips", 1)] ... ] >>> # ARP for each item >>> external_utilities = { ... "Coke 12oz": 1.29, ... "Chips": 2.99, ... "Dip": 3.49 ... } >>> # Minimum dollar value generated by an itemset we care about across all transactions >>> minutil = 20.00 >>> hui = TwoPhase(transactions, external_utilities, minutil) >>> result = hui.get_hui() >>> sorted(result, key=attrgetter('itemset_utility'), reverse=True) ... # doctest: +NORMALIZE_WHITESPACE [HUIRecord(items=('Chips', 'Coke 12oz'), itemset_utility=30.02), HUIRecord(items=('Chips',), itemset_utility=20.93)] """ def __init__( self, transactions: Union[ Generator[TupleOfStrAndNum, None, None], List[TupleOfStrAndNum] ], external_utilities: Dict, minutil: int, ): # todo: sort transaction items lexographically? self.transactions_type = type(transactions).__name__ # "generator" or "list" self.transactions = self.ensure_transaction_items_unique(transactions) # For items in transactions, but not in external_utilities, # self.external_utilities[item] = 0 # can still loop through those items for initial candidate set, # because if the external utilities are 0, not going to be important anyway self.external_utilities = defaultdict(int, external_utilities) self.minutil = minutil # self.num_transaction = 0 self._items = list(external_utilities.keys()) # todo: see if keeping track of item indices offers speedup # self._item_transaction_ids = {} @staticmethod def _ensure_transaction_items_unique_generator(transactions): for idx, transaction in enumerate(transactions): t_list = list(transaction) item_names = [item[0] for item in t_list] if len(item_names) != len(set(item_names)): raise ValueError( f"There are duplicate items in transaction index {idx}. " f"Transactions must have a unique list of items, " f"with their internal utilities aggregated togther." ) yield transaction @staticmethod def _ensure_transaction_items_unique_memory(transactions): for idx, transaction in enumerate(transactions): item_names = [item[0] for item in transaction] if len(item_names) != len(set(item_names)): raise ValueError( f"There are duplicate items in transaction index {idx}. " f"Transactions must have a unique list of items, " f"with their internal utilities aggregated togther." ) return transactions
[docs] @classmethod def ensure_transaction_items_unique(cls, transactions): """Ensure there are no duplicate items within a transaction, while preserving generators. """ if isinstance(transactions, GeneratorType): transactions = cls._ensure_transaction_items_unique_generator(transactions) else: transactions = cls._ensure_transaction_items_unique_memory(transactions) return transactions
@property def items(self): """Returns the list of items that appear in transactions.""" return sorted(self._items)
[docs] def calc_transaction_utility(self, transaction: Tuple[Any, Union[int, float]]): return sum( [ self.external_utilities[item] * internal_utility for item, internal_utility in transaction ] )
[docs] def calc_twu(self, itemset: List[str]): """Calculated the transaction-weighted utilization for an itemset""" twu = 0 if self.transactions_type == "generator": self.transactions, transactions = tee(self.transactions) else: transactions = self.transactions for transaction in transactions: transaction_itemset = [t[0] for t in transaction] if all(item in transaction_itemset for item in itemset): twu += self.calc_transaction_utility(transaction) return twu
[docs] def calc_itemset_utility(self, itemset: Tuple): itemset_utility = 0 if self.transactions_type == "generator": self.transactions, transactions = tee(self.transactions) else: transactions = self.transactions for transaction in transactions: transaction_itemset = [i[0] for i in transaction] if all(item in transaction_itemset for item in itemset): transaction_part = [ tup for item in itemset for tup in transaction if tup[0] == item ] itemset_utility += self.calc_transaction_utility(transaction_part) return itemset_utility
[docs] def initial_candidates(self): """Returns the initial candidates.""" return [tuple([item]) for item in self.items]
@staticmethod def _create_next_candidates(prev_candidates: Set, length: int): """Returns the apriori candidates as a list. Args: prev_candidates: Previous candidates as a list. length: The lengths of the next candidates. """ items = sorted(set(chain.from_iterable(prev_candidates))) # Create the temporary candidates. These will be filtered below. tmp_next_candidates = (itemset for itemset in combinations(items, length)) # Return all the candidates if the length of the next candidates is 2 # because their subsets are the same as items. if length < 3: return tmp_next_candidates # Filter to candidates where all of their subsets are in the previous candidates. next_candidates = [ candidate for candidate in tmp_next_candidates if all( itemset in prev_candidates for itemset in combinations(candidate, length - 1) ) ] return next_candidates
[docs] def get_high_twu_itemsets( self, max_length: Optional[int] = None ) -> Generator[CandidateHUIRecord, None, None]: """Returns a generator of support records with given transactions. This is "Phase 1." """ candidate_itemsets = self.initial_candidates() length = 1 while candidate_itemsets: high_util_itemsets = set() for candidate_itemset in candidate_itemsets: twu = self.calc_twu(candidate_itemset) if twu < self.minutil: continue high_util_itemsets.add(candidate_itemset) yield CandidateHUIRecord(candidate_itemset, twu) length += 1 if max_length and length > max_length: break candidate_itemsets = self._create_next_candidates( high_util_itemsets, length )
[docs] def get_hui( self, max_length: Optional[int] = None ) -> Generator[HUIRecord, None, None]: # Phase I high_twu_itemsets = self.get_high_twu_itemsets(max_length=max_length) candidate_itemsets = (itemset.items for itemset in high_twu_itemsets) # Phase II for itemset in candidate_itemsets: itemset_utility = self.calc_itemset_utility(itemset) if itemset_utility < self.minutil: continue """ Note: Making a [frozen]set of items isn't necessary, since input is required to have unique itemsets per transaction. This also allows preserving itemset order. """ yield HUIRecord(itemset, itemset_utility)