Source code for spamosaic.MNN

import numpy as np
import pandas as pd

from sklearn.neighbors import NearestNeighbors
from annoy import AnnoyIndex
import itertools
import networkx as nx
import hnswlib
from sklearn.preprocessing import normalize

[docs]def nn_approx(ds1, ds2, names1, names2, knn=50): """ Approximate nearest neighbor search using HNSW (hnswlib). Parameters ---------- ds1 : np.ndarray Query dataset (N1, D). ds2 : np.ndarray Reference dataset (N2, D). names1 : list of str Identifiers for rows in `ds1`. names2 : list of str Identifiers for rows in `ds2`. knn : int, default=50 Number of nearest neighbors to find. Returns ------- Set[Tuple[str, str]] Set of matched (query_name, reference_name) pairs. """ dim = ds2.shape[1] num_elements = ds2.shape[0] p = hnswlib.Index(space='l2', dim=dim) p.init_index(max_elements=num_elements, ef_construction=100, M = 16) p.set_ef(10) p.add_items(ds2) ind, distances = p.knn_query(ds1, k=knn) match = set() for a, b in zip(range(ds1.shape[0]), ind): for b_i in b: match.add((names1[a], names2[b_i])) return match
[docs]def nn(ds1, ds2, names1, names2, knn=50, metric_p=2): """ Exact nearest neighbor search using scikit-learn. Parameters ---------- ds1 : np.ndarray Query dataset. ds2 : np.ndarray Reference dataset. names1 : list of str Identifiers for `ds1`. names2 : list of str Identifiers for `ds2`. knn : int Number of nearest neighbors. metric_p : int Minkowski distance parameter (e.g., 2 for Euclidean). Returns ------- Set[Tuple[str, str]] Set of matched nearest neighbor pairs. """ # Find nearest neighbors of first dataset. nn_ = NearestNeighbors(knn, p=metric_p) nn_.fit(ds2) ind = nn_.kneighbors(ds1, return_distance=False) match = set() for a, b in zip(range(ds1.shape[0]), ind): for b_i in b: match.add((names1[a], names2[b_i])) return match
[docs]def nn_annoy(ds1, ds2, names1, names2, norm=True, knn = 20, metric='euclidean', n_trees = 10, save_on_disk = False): """ Approximate nearest neighbor search using Annoy index. Parameters ---------- ds1 : np.ndarray Query dataset. ds2 : np.ndarray Reference dataset. names1 : list of str Identifiers for `ds1`. names2 : list of str Identifiers for `ds2`. norm : bool, default=True Whether to L2 normalize datasets before indexing. knn : int Number of nearest neighbors to retrieve. metric : str, default='euclidean' Distance metric ('euclidean', 'manhattan', etc.). n_trees : int, default=10 Number of trees to build in Annoy index. save_on_disk : bool, default=False Whether to write index to disk. Returns ------- Set[Tuple[str, str]] Set of nearest neighbor pairs. """ if norm: ds1 = normalize(ds1) ds2 = normalize(ds2) """ Assumes that Y is zero-indexed. """ # Build index. a = AnnoyIndex(ds2.shape[1], metric=metric) if(save_on_disk): a.on_disk_build('annoy.index') for i in range(ds2.shape[0]): a.add_item(i, ds2[i, :]) a.build(n_trees) # Search index. ind = [] for i in range(ds1.shape[0]): ind.append(a.get_nns_by_vector(ds1[i, :], knn, search_k=-1)) ind = np.array(ind) # Match. match = set() for a, b in zip(range(ds1.shape[0]), ind): for b_i in b: match.add((names1[a], names2[b_i])) return match
[docs]def mnn(ds1, ds2, names1, names2, knn1=20, knn2=20, approx = True, metric='euclidean', way='hnsw', norm=False): """ Compute Mutual Nearest Neighbors (MNN) between two datasets. Parameters ---------- ds1 : np.ndarray First dataset (queries). ds2 : np.ndarray Second dataset (references). names1 : list of str Identifiers for `ds1`. names2 : list of str Identifiers for `ds2`. knn1 : int Number of neighbors for `ds1` → `ds2`. knn2 : int Number of neighbors for `ds2` → `ds1`. approx : bool, default=True Whether to use approximate neighbor search. metric : str, default='euclidean' Distance metric to use. way : str, default='hnsw' Method for approximate search: 'hnsw' or 'annoy'. norm : bool, default=False Whether to normalize data before Annoy search. Returns ------- Set[Tuple[str, str]] Set of mutual nearest neighbor pairs. """ if approx: if way=='hnsw': # Find nearest neighbors in first direction. # output KNN point for each point in ds1. match1 is a set(): (points in names1, points in names2), the size of the set is ds1.shape[0]*knn match1 = nn_approx(ds1, ds2, names1, names2, knn=knn1) # Find nearest neighbors in second direction. match2 = nn_approx(ds2, ds1, names2, names1, knn=knn2) else: match1 = nn_annoy(ds1, ds2, names1, names2, norm=norm, knn=knn1, metric=metric) match2 = nn_annoy(ds2, ds1, names2, names1, norm=norm, knn=knn2, metric=metric) else: match1 = nn(ds1, ds2, names1, names2, knn=knn1) match2 = nn(ds2, ds1, names2, names1, knn=knn2) # Compute mutual nearest neighbors. mutual = match1 & set([ (b, a) for a, b in match2 ]) return mutual