from collections import defaultdict, namedtuple
from itertools import chain, combinations, tee
from types import GeneratorType
from typing import Any, Dict, Generator, List, Optional, Set, Tuple, Union
TupleOfStrAndNum = Tuple[str, Union[int, float]]
HUIRecord = namedtuple( # pylint: disable=C0103
"HUIRecord", ("items", "itemset_utility")
)
CandidateHUIRecord = namedtuple("HUIRecord", ("items", "twu")) # pylint: disable=C0103
[docs]class TwoPhase:
"""
Example:
>>> from itemset_mining.two_phase_huim import TwoPhase
>>> from operator import attrgetter
>>> transactions = [
... [("Coke 12oz", 6), ("Chips", 2), ("Dip", 1)],
... [("Coke 12oz", 1)],
... [("Coke 12oz", 2), ("Chips", 1)],
... [("Chips", 1)],
... [("Chips", 2)],
... [("Coke 12oz", 6), ("Chips", 1)]
... ]
>>> # ARP for each item
>>> external_utilities = {
... "Coke 12oz": 1.29,
... "Chips": 2.99,
... "Dip": 3.49
... }
>>> # Minimum dollar value generated by an itemset we care about across all transactions
>>> minutil = 20.00
>>> hui = TwoPhase(transactions, external_utilities, minutil)
>>> result = hui.get_hui()
>>> sorted(result, key=attrgetter('itemset_utility'), reverse=True)
... # doctest: +NORMALIZE_WHITESPACE
[HUIRecord(items=('Chips', 'Coke 12oz'), itemset_utility=30.02),
HUIRecord(items=('Chips',), itemset_utility=20.93)]
"""
def __init__(
self,
transactions: Union[
Generator[TupleOfStrAndNum, None, None], List[TupleOfStrAndNum]
],
external_utilities: Dict,
minutil: int,
):
# todo: sort transaction items lexographically?
self.transactions_type = type(transactions).__name__ # "generator" or "list"
self.transactions = self.ensure_transaction_items_unique(transactions)
# For items in transactions, but not in external_utilities,
# self.external_utilities[item] = 0
# can still loop through those items for initial candidate set,
# because if the external utilities are 0, not going to be important anyway
self.external_utilities = defaultdict(int, external_utilities)
self.minutil = minutil
# self.num_transaction = 0
self._items = list(external_utilities.keys())
# todo: see if keeping track of item indices offers speedup
# self._item_transaction_ids = {}
@staticmethod
def _ensure_transaction_items_unique_generator(transactions):
for idx, transaction in enumerate(transactions):
t_list = list(transaction)
item_names = [item[0] for item in t_list]
if len(item_names) != len(set(item_names)):
raise ValueError(
f"There are duplicate items in transaction index {idx}. "
f"Transactions must have a unique list of items, "
f"with their internal utilities aggregated togther."
)
yield transaction
@staticmethod
def _ensure_transaction_items_unique_memory(transactions):
for idx, transaction in enumerate(transactions):
item_names = [item[0] for item in transaction]
if len(item_names) != len(set(item_names)):
raise ValueError(
f"There are duplicate items in transaction index {idx}. "
f"Transactions must have a unique list of items, "
f"with their internal utilities aggregated togther."
)
return transactions
[docs] @classmethod
def ensure_transaction_items_unique(cls, transactions):
"""Ensure there are no duplicate items within a transaction,
while preserving generators.
"""
if isinstance(transactions, GeneratorType):
transactions = cls._ensure_transaction_items_unique_generator(transactions)
else:
transactions = cls._ensure_transaction_items_unique_memory(transactions)
return transactions
@property
def items(self):
"""Returns the list of items that appear in transactions."""
return sorted(self._items)
[docs] def calc_transaction_utility(self, transaction: Tuple[Any, Union[int, float]]):
return sum(
[
self.external_utilities[item] * internal_utility
for item, internal_utility in transaction
]
)
[docs] def calc_twu(self, itemset: List[str]):
"""Calculated the transaction-weighted utilization for an itemset"""
twu = 0
if self.transactions_type == "generator":
self.transactions, transactions = tee(self.transactions)
else:
transactions = self.transactions
for transaction in transactions:
transaction_itemset = [t[0] for t in transaction]
if all(item in transaction_itemset for item in itemset):
twu += self.calc_transaction_utility(transaction)
return twu
[docs] def calc_itemset_utility(self, itemset: Tuple):
itemset_utility = 0
if self.transactions_type == "generator":
self.transactions, transactions = tee(self.transactions)
else:
transactions = self.transactions
for transaction in transactions:
transaction_itemset = [i[0] for i in transaction]
if all(item in transaction_itemset for item in itemset):
transaction_part = [
tup for item in itemset for tup in transaction if tup[0] == item
]
itemset_utility += self.calc_transaction_utility(transaction_part)
return itemset_utility
[docs] def initial_candidates(self):
"""Returns the initial candidates."""
return [tuple([item]) for item in self.items]
@staticmethod
def _create_next_candidates(prev_candidates: Set, length: int):
"""Returns the apriori candidates as a list.
Args:
prev_candidates: Previous candidates as a list.
length: The lengths of the next candidates.
"""
items = sorted(set(chain.from_iterable(prev_candidates)))
# Create the temporary candidates. These will be filtered below.
tmp_next_candidates = (itemset for itemset in combinations(items, length))
# Return all the candidates if the length of the next candidates is 2
# because their subsets are the same as items.
if length < 3:
return tmp_next_candidates
# Filter to candidates where all of their subsets are in the previous candidates.
next_candidates = [
candidate
for candidate in tmp_next_candidates
if all(
itemset in prev_candidates
for itemset in combinations(candidate, length - 1)
)
]
return next_candidates
[docs] def get_high_twu_itemsets(
self, max_length: Optional[int] = None
) -> Generator[CandidateHUIRecord, None, None]:
"""Returns a generator of support records with given transactions.
This is "Phase 1."
"""
candidate_itemsets = self.initial_candidates()
length = 1
while candidate_itemsets:
high_util_itemsets = set()
for candidate_itemset in candidate_itemsets:
twu = self.calc_twu(candidate_itemset)
if twu < self.minutil:
continue
high_util_itemsets.add(candidate_itemset)
yield CandidateHUIRecord(candidate_itemset, twu)
length += 1
if max_length and length > max_length:
break
candidate_itemsets = self._create_next_candidates(
high_util_itemsets, length
)
[docs] def get_hui(
self, max_length: Optional[int] = None
) -> Generator[HUIRecord, None, None]:
# Phase I
high_twu_itemsets = self.get_high_twu_itemsets(max_length=max_length)
candidate_itemsets = (itemset.items for itemset in high_twu_itemsets)
# Phase II
for itemset in candidate_itemsets:
itemset_utility = self.calc_itemset_utility(itemset)
if itemset_utility < self.minutil:
continue
"""
Note:
Making a [frozen]set of items isn't necessary,
since input is required to have unique itemsets per transaction.
This also allows preserving itemset order.
"""
yield HUIRecord(itemset, itemset_utility)