import numpy as np
from itertools import starmap
from typing import List
def _proximity(h: float, alpha: float, distances: np.ndarray) -> np.ndarray:
"""Calculates the proximity factor P for an array of distances.
Implements equation 1 in the paper
Arguments:
h {float} -- max hopping distance
alpha {float} -- propagation factor
distances {np.array} -- array of hopping distances
Raises:
ValueError -- if hopping distance is less than zero
ValueError -- if propagation factor is not between zero and one
Returns:
np.array -- an array of proximiy values
"""
if h < 0:
raise ValueError('hopping distance h cannot be negative')
if not 0 < alpha <= 1:
raise ValueError('propagation factor alpha must be between 0 and 1')
return np.multiply(
np.less_equal(distances, h),
np.power(alpha, distances)
)
def _delta_plus(x: np.ndarray, y: np.ndarray) -> np.ndarray:
"""Comparator function. Equation 3 in the paper.
Arguments:
x {np.array} -- an array of floats
y {np.array} -- an array of floats
Returns
np.array -- an array of floats
"""
return np.multiply(
np.greater(x, y),
np.subtract(x, y)
)
[docs]def group_by(columns, arr):
"""Split an array into n slices where 'columns'
are all equal within each slice
Arguments:
columns {List[str]} -- a list of column names
arr {np.array} -- a numpy structured array
Returns
keys: np.array -- the column values uniquly identifying each group
groups: List[np.array] -- a list of numpy arrays
"""
if not len(columns):
raise ValueError("group_by requires a non empty list of column names")
_, counts = np.unique(arr[columns], return_counts=True)
indices = np.insert(np.cumsum(counts), 0, 0)
split = np.split(arr, indices)
filtered = list(filter(lambda x: len(x), split))
keys = map(lambda x: x[columns][0].tolist(), filtered)
stacked = np.vstack(v for v in keys)
return stacked, filtered
[docs]def group_by_first(columns, arr):
"""
Split an array into n slices where 'columns'
all compare equal within each slide
Take the first row of each slice
Combine each of the rows into a single array through concatination
Arguments:
columns {[str]} -- a list of column names
arr {[type]} -- a numpy structured array
Returns:
np.array - new concatinated array
"""
_, counts = np.unique(arr[columns], return_counts=True)
indices = np.cumsum(np.insert(counts, 0, 0))[:-1]
return arr[indices]
[docs]class Base(np.recarray):
"""A Base class for subclassing numpy record arrays
Returns:
np.recarray -- A subclass of np.recarray
"""
columns = []
types = []
def __new__(cls, *args, **kwargs):
dtype = list(zip(cls.columns, cls.types))
a = np.array([tuple(row) for row in args[0]], dtype=dtype)
return a.view(cls)
[docs]class QueryResult(Base):
"""Represents a query from the database as a numpy rec array"""
columns = 'v u vv uu dist_v dist_u weight'.split()
types = [np.int64, np.int64, np.int64, np.int64,
np.float32, np.float32, np.float32]
@property
def v(self):
"""Get column v
Returns:
np.ndarray -- array of query node ids as integers
"""
return getattr(super(), 'v')
@property
def u(self):
"""Get column u
Returns:
np.ndarray -- array of target node ids as integers
"""
return getattr(super(), 'u')
@property
def vv(self):
"""Get column vv - written v prime (v') in the paper
where v' is a query node within
hopping distance h of query node v
Returns:
np.ndarray -- array of query node ids as integers
"""
return getattr(super(), 'vv')
@property
def uu(self):
"""Get column uu - written u prime (u') in the paper
where u' is a target node within
hopping distance h of target node u
values less than zero indicate that uu (u') has
no corresponding matches to any node v'
Returns:
np.ndarray -- array of target node ids as integers
"""
return getattr(super(), 'uu')
@property
def dist_v(self):
"""The hopping distance between query node v and query node vv (v')
Returns:
np.ndarray -- array of hopping distances as integers
"""
return getattr(super(), 'dist_v')
@property
def dist_u(self):
"""The hopping distance between target node u and target node uu (u')
Returns:
np.ndarray -- array of hopping distances as integers
"""
return getattr(super(), 'dist_u')
@property
def weight(self):
"""String matching score between uu (u') and vv (v')
Returns:
np.ndarray -- array of floating point weights
"""
return getattr(super(), 'weight')
def __repr__(self):
return 'QueryResult(records={}, dtypes={})'.format(
[record for record in self], self.types
)
[docs]class NeighbourHoodMatchingCosts(Base):
"""Represents a table of all valid neighbourhood matching costs"""
columns = 'v u vv uu cost'.split()
types = [np.int64, np.int64, np.int64, np.int64, np.float32]
def __getitem__(self, indx):
"""Get the row representing the neighbourhood matching costs at index "indx"
Arguments:
indx {int, slice} -- index into the table
Returns:
NeighbourHoodMatchingCosts -- NeighbourHoodMatchingCosts
sliced by indx
"""
return super().__getitem__(indx)
@property
def v(self):
"""Get column v
Returns:
np.ndarray -- array of query node ids as integers
"""
return getattr(super(), 'v')
@property
def u(self):
"""Get column u
Returns:
np.ndarray -- array of target node ids as integers
"""
return getattr(super(), 'u')
@property
def vv(self):
"""Get column vv - written v prime (v') in the paper
where v' is a query node within
hopping distance h of query node v
Returns:
np.ndarray -- array of query node ids as integers
"""
return getattr(super(), 'vv')
@property
def uu(self):
"""Get column uu - written u prime (u') in the paper
where u' is a target node within
hopping distance h of target node u
Returns:
np.ndarray -- array of target node ids as integers
"""
return getattr(super(), 'uu')
@property
def cost(self):
"""Get column cost - all valid neighbourhood matching costs.
Eq 2 in the paper - multiplied by 1 - lambda
Returns:
np.ndarray -- array of costs and floats
"""
return getattr(super(), 'cost')
def __repr__(self):
return 'NeighbourHoodMatchingCosts(records={}, dtypes={})'.format(
[record for record in self], self.types
)
[docs]class PartialMatchingCosts(Base):
""" A table representing all valid partial matching costs """
columns = 'v u vv cost'.split()
types = [np.int64, np.int64, np.int64, np.float32]
@property
def v(self):
"""Get column v
Returns:
np.ndarray -- array of query node ids as integers
"""
return getattr(super(), 'v')
@property
def u(self):
"""Get column u
Returns:
np.ndarray -- array of target node ids as integers
"""
return getattr(super(), 'u')
@property
def vv(self):
"""Get column vv - written v prime (v') in the paper
where v' is a query node within
hopping distance h of query node v
Returns:
np.ndarray -- array of query node ids as integers
"""
return getattr(super(), 'uu')
@property
def cost(self):
"""Get column cost - all valid partial matching costs.
Eq 13 in the paper (W) - but with beta multiplied by
a factor of 1 - lambda
Returns:
np.ndarray -- array of costs as floats
"""
return getattr(super(), 'cost')
def __repr__(self):
return 'PartialMatchingCosts(records={}, dtypes={})'.format(
[record for record in self], self.types
)
[docs]class InferenceCost(Base):
""" A table representing all valid inference costs between query node
u and target node v"""
columns = 'v u cost'.split()
types = [np.int64, np.int64, np.float32]
@property
def v(self):
"""Get column v
Returns:
np.ndarray -- array of query node ids as integers
"""
return getattr(super(), 'v')
@property
def u(self):
"""Get column u
Returns:
np.ndarray -- array of target node ids as integers
"""
return getattr(super(), 'u')
@property
def cost(self):
"""Get column cost - all valid inference costs for
query node v and target node u.
Eq 14 in the paper (U)
Returns:
np.ndarray -- array of costs as floats
"""
return getattr(super(), 'cost')
def __repr__(self):
return 'InferenceCost(records={}, dtypes={})'.format(
[record for record in self], self.types
)
[docs]class OptimalMatch(Base):
"""Table representing the cost of the optimal
match for query node v going to u"""
columns = 'v u cost'.split()
types = [np.int64, np.int64, np.float32]
@property
def v(self):
"""Get column v
Returns:
np.ndarray -- array of query node ids as integers
"""
return getattr(super(), 'v')
@property
def u(self):
"""Get column u
Returns:
np.ndarray -- array of target node ids as integers
"""
return getattr(super(), 'u')
@property
def cost(self):
"""Get column cost - the optimal matching cost for u going to v.
Eq 10 in the paper (O)
Returns:
np.ndarray -- array of costs as floats
"""
return getattr(super(), 'cost')
# TODO: This seems a big ugly as a class and not consistent with the rest
# of the code
[docs]class Refiner:
"""Take each of the matches and recursivly find
all of their neighbours via a greedy algorithm"""
def __init__(
self, neighbourhood_matching_costs: NeighbourHoodMatchingCosts):
"""Initialise a refiner using a Frame instance
For each pair (v, u) find the lowest cost adjacent pair (vv, uu)
Arguments:
frame {[Frame]} -- A frame constructed
records returned by the database
"""
# group all of the pairs (v, u)
matches = {tuple(k): v for k, v in zip(
*group_by(['v', 'u'], neighbourhood_matching_costs))}
# get the costs cost neighbouring pair (vv, uu)
neighbours = {
k: group_by_first(['v', 'u', 'vv'], v)[['vv', 'uu']].tolist()
for k, v in matches.items()
}
self.neighbours = {
k: [item for item in v if self.valid_neighbours(k, item)]
for k, v in neighbours.items()
}
def __call__(self, seed, result):
"""Given a pair v, u,
greedily find the lowest cost neighbours
recursivly covering the whole graph
without cyclic paths
Arguments:
seed {[int, int]} -- u, v pair
Keyword Arguments:
result {[int, int]} -- stores the result between
recursive calls (default: {None})
Returns:
[type] -- List of query_node, target_node
id pairs constituting a result
"""
# To not add a new result if we've already visited this query node
if seed[0] in set(query_node for query_node, target_node in result):
return
# Now do the same for all the neighbours of the new result
result += [seed]
for neighbour in self.neighbours[seed]:
self(neighbour, result)
return
[docs] @staticmethod
def valid_neighbours(first: tuple, second: tuple):
"""Function that governs a valid hop between nodes
Arguments:
first {int, int} -- source query_node, target_node id pair
second {int, int} -- target query_node, target_node id pair
Returns:
Bool -- True is a valid transition
"""
# transitions to self are not valid
if first == second:
return False
# transitions to missing target nodes are not valid
if any(u < 0 for u in second):
return False
return True
def _get_matching_costs(
records: List[tuple], hopping_distance, lmbda=.3, alpha=.3, ):
"""Create a table of matching costs from a
table of query results using equation 2
Equivalent to the first term of equation 13
Returns:
NeighbourHoodMatchingCosts -- table of all valid matching costs
QueryResult -- query result as a numpy rec
array rather than a list of tuples
"""
# convert NaN records into negative numbers so they can be stored as ints
# using numpy
query_result = QueryResult(
[
tuple(item if item is not None else -1 for item in tup)
for tup in records
]
)
# label costs are weights in the databse
query_result.weight = 1. - query_result.weight
query_result = np.sort(query_result, order=[
'v', 'u', 'vv', 'uu', 'weight'])
nan_idx = query_result.dist_u < 0
dist_u = query_result.dist_u
dist_u[nan_idx] = hopping_distance + 1
# convert hopping distances into proximities (eq. 1)
prox_v = _proximity(hopping_distance, alpha, query_result.dist_v)
prox_u = _proximity(hopping_distance, alpha, dist_u)
cost = _delta_plus(prox_v, prox_u)
cost *= (1. - lmbda)
arr = np.unique(
np.rec.fromarrays(
(query_result.v, query_result.vv, prox_v),
dtype=[('v', np.int64), ('vv', np.int64), ('prox_v', float)]
),
axis=0
)
vs, groups = group_by('v', arr)
beta_ = {v[0]: sum(group['prox_v']) for v, group in zip(vs, groups)}
beta = np.vectorize(lambda x: beta_[x])
neighbourhood_matching_costs = np.rec.fromarrays([
query_result.v,
query_result.u,
query_result.vv,
query_result.uu,
cost
],
dtype=list(zip(
NeighbourHoodMatchingCosts.columns,
NeighbourHoodMatchingCosts.types
))
).view(NeighbourHoodMatchingCosts)
return neighbourhood_matching_costs, query_result, beta
def _get_partial_inference_costs(
neighbourhood_matching_costs: NeighbourHoodMatchingCosts,
beta
) -> PartialMatchingCosts:
"""get the lowest cost neighbourhood matching cost for each partial match
Arguments:
neighbourhood_matching_costs {NeighbourHoodMatchingCosts} -- sorted
neighbourhood_matching_costs
Returns:
PartialMatchingCosts
"""
grouped = group_by_first(['v', 'u', 'vv'], neighbourhood_matching_costs)
partial_matching_costs = np.rec.fromarrays([
grouped.v, grouped.u, grouped.vv, grouped.cost
],
dtype=list(zip(
PartialMatchingCosts.columns,
PartialMatchingCosts.types
))
).view(PartialMatchingCosts)
partial_matching_costs.cost /= beta(partial_matching_costs.v)
return partial_matching_costs
def _get_inference_costs(
partial_matching_costs: PartialMatchingCosts) -> InferenceCost:
"""sum partial matching cost for each query node target node pair (v, u)
Arguments:
partial_matching_costs {PartialMatchingCosts} -- sorted
partial_matching_costs
Returns:
InferenceCost
"""
# TODO: inference_costs include the label weight
keys, groups = group_by(['v', 'u'], partial_matching_costs)
summed = starmap(
lambda key, group: (key[0], key[1], np.sum(group.cost) / len(group)),
zip(keys, groups)
)
return InferenceCost(summed)
def _get_optimal_match(inference_costs: InferenceCost) -> OptimalMatch:
"""Get the lowest cost match for each query node v
Arguments:
inference_costs {InferenceCost} -- sorted inference costs
Returns:
OptimalMatch
"""
return group_by_first('v', inference_costs)
[docs]def solve(records: List[tuple], max_iters=10, hopping_distance=2):
"""Generate a set of subgraph matches and costs from a query result
Arguments:
records {List[tuple]}
"""
# initialisation
finished, iters = False, 0
prv_optimum_match = None
neighbourhood_matching_costs, query_result, beta = _get_matching_costs(
records, hopping_distance)
# keep a copy for successive iterations
neighbourhood_matching_costs_cpy = neighbourhood_matching_costs.copy()
label_costs = {
(record.v, record.u): record.weight
for record in query_result
}
label_costs_func = np.vectorize(lambda x: label_costs.get(tuple(x)))
while True:
# first optimisation
neighbourhood_matching_costs = np.sort(
neighbourhood_matching_costs,
order=['v', 'u', 'vv', 'cost'],
axis=0
)
partial_inference_costs = _get_partial_inference_costs(
neighbourhood_matching_costs, beta)
inference_costs = _get_inference_costs(partial_inference_costs)
additional_costs = np.array([label_costs.get(tuple(x)) for x in inference_costs[['v', 'u']]], dtype=np.float32)
inference_costs.cost += additional_costs
# second optimisation
inference_costs = np.sort(inference_costs, order=['v', 'cost'])
optimum_match = _get_optimal_match(inference_costs)
inference_costs_dict = {
(record.v, record.u): record.cost for record in inference_costs}
apply = np.vectorize(
lambda x: inference_costs_dict.get(tuple(x), iters))
neighbourhood_matching_costs = neighbourhood_matching_costs_cpy.copy()
neighbourhood_matching_costs.cost += apply(
neighbourhood_matching_costs[['vv', 'uu']])
iters += 1
# test for convergance
if prv_optimum_match is not None:
diff = prv_optimum_match[['v', 'u']] == optimum_match[['v', 'u']]
finished = (sum(diff) / len(optimum_match)) > .9
if finished:
break
prv_optimum_match = optimum_match
if iters >= max_iters:
break
neighbourhood_matching_costs = np.sort(
neighbourhood_matching_costs,
order=['v', 'u', 'vv', 'cost'],
axis=0
)
refine = Refiner(neighbourhood_matching_costs)
# normalise inference costs by the number of iterations
inference_costs.cost /= iters
inference_costs_dict = {
(record.v, record.u): record.cost for record in inference_costs}
inference_costs = np.sort(inference_costs, order=['cost'])
subgraph_matches = []
for seed in inference_costs[['v', 'u']]:
subgraph_match = []
refine(tuple(seed), subgraph_match)
subgraph_match = sorted(subgraph_match)
if subgraph_match not in subgraph_matches:
subgraph_matches.append(subgraph_match)
target_edges = group_by_first(['u', 'uu', 'dist_u'], query_result)[
['u', 'uu', 'dist_u']]
mask = target_edges['dist_u'] > 0
mask *= target_edges['dist_u'] <= 1
mask *= target_edges['u'] < target_edges['uu']
mask > 0
target_edges = np.sort(target_edges[mask])
return inference_costs_dict, subgraph_matches, iters, len(
optimum_match), target_edges