Spaces:
Running
Running
| from collections import defaultdict | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List | |
| from graphgen.models.storage.networkx_storage import NetworkXStorage | |
| class CommunityDetector: | |
| """Class for community detection algorithms.""" | |
| graph_storage: NetworkXStorage = None | |
| method: str = "leiden" | |
| method_params: Dict[str, Any] = None | |
| async def detect_communities(self) -> Dict[str, int]: | |
| if self.method == "leiden": | |
| return await self._leiden_communities(**self.method_params or {}) | |
| raise ValueError(f"Unknown community detection method: {self.method}") | |
| async def get_graph(self): | |
| return await self.graph_storage.get_graph() | |
| async def _leiden_communities( | |
| self, max_size: int = None, **kwargs | |
| ) -> Dict[str, int]: | |
| """ | |
| Detect communities using the Leiden algorithm. | |
| If max_size is given, any community larger than max_size will be split | |
| into smaller sub-communities each having at most max_size nodes. | |
| """ | |
| import igraph as ig | |
| import networkx as nx | |
| from leidenalg import ModularityVertexPartition, find_partition | |
| graph = await self.get_graph() | |
| graph.remove_nodes_from(list(nx.isolates(graph))) | |
| ig_graph = ig.Graph.TupleList(graph.edges(), directed=False) | |
| random_seed = kwargs.get("random_seed", 42) | |
| use_lcc = kwargs.get("use_lcc", False) | |
| communities: Dict[str, int] = {} | |
| if use_lcc: | |
| lcc = ig_graph.components().giant() | |
| partition = find_partition(lcc, ModularityVertexPartition, seed=random_seed) | |
| for part, cluster in enumerate(partition): | |
| for v in cluster: | |
| communities[lcc.vs[v]["name"]] = part | |
| else: | |
| offset = 0 | |
| for component in ig_graph.components(): | |
| subgraph = ig_graph.induced_subgraph(component) | |
| partition = find_partition( | |
| subgraph, ModularityVertexPartition, seed=random_seed | |
| ) | |
| for part, cluster in enumerate(partition): | |
| for v in cluster: | |
| original_node = subgraph.vs[v]["name"] | |
| communities[original_node] = part + offset | |
| offset += len(partition) | |
| # split large communities if max_size is specified | |
| if max_size is None or max_size <= 0: | |
| return communities | |
| return await self._split_communities(communities, max_size) | |
| async def _split_communities( | |
| communities: Dict[str, int], max_size: int | |
| ) -> Dict[str, int]: | |
| """ | |
| Split communities larger than max_size into smaller sub-communities. | |
| """ | |
| cid2nodes: Dict[int, List[str]] = defaultdict(list) | |
| for node, cid in communities.items(): | |
| cid2nodes[cid].append(node) | |
| new_communities: Dict[str, int] = {} | |
| new_cid = 0 | |
| for cid, nodes in cid2nodes.items(): | |
| if len(nodes) <= max_size: | |
| for n in nodes: | |
| new_communities[n] = new_cid | |
| new_cid += 1 | |
| else: | |
| for start in range(0, len(nodes), max_size): | |
| sub = nodes[start : start + max_size] | |
| for n in sub: | |
| new_communities[n] = new_cid | |
| new_cid += 1 | |
| return new_communities | |