Files
daglib/tests/test_dag.py
John Lancaster 88a4064a71 subgraph test
2026-02-21 15:09:38 -06:00

331 lines
9.6 KiB
Python

from __future__ import annotations
import pytest
from daglib.dag import DAG
class TestDAGInit:
"""Test DAG initialization."""
def test_init_empty(self) -> None:
g = DAG()
assert len(g) == 0
def test_reverse(self) -> None:
g = DAG()
assert len(g.reverse) == 0
def test_default_callbacks_none(self) -> None:
g = DAG()
assert g.on_add is None
assert g.on_remove is None
class TestDAGOps:
class TestDAGAddEdge:
"""Test adding edges."""
def test_add_single_edge(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
assert "foo" in g._succ
assert "bar" in g._succ["foo"]
assert "bar" in g._pred
assert "foo" in g._pred["bar"]
assert set(g["foo"]) == {"bar"}
def test_add_multiple_edges(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
g.add_edge("foo", "baz")
g.add_edge("bar", "baz")
assert "foo" in g._succ
assert "bar" in g._succ["foo"]
assert "baz" in g._succ["foo"]
assert set(g["foo"]) == {"bar", "baz"}
assert "bar" in g._succ
assert "baz" in g._succ["bar"]
assert set(g["bar"]) == {"baz"}
assert "bar" in g["foo"]
assert "baz" in g["foo"]
assert "baz" in g["bar"]
def test_dagsetview(self) -> None:
g = DAG[str]()
added = []
g.on_add = lambda u, v: added.append(v)
g["foo"] += {"bar", "baz"}
assert set(g["foo"]) == {"bar", "baz"}
assert "bar" in added
assert "baz" in added
def test_self_loop_raises_error(self) -> None:
g = DAG[str]()
with pytest.raises(ValueError, match="Self-loops are not allowed"):
g.add_edge("foo", "foo")
def test_add_duplicate_edge_idempotent(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
g.add_edge("foo", "bar")
assert len(g["foo"]) == 1
class TestDAGRemoveEdge:
"""Test removing edges."""
def test_remove_existing_edge(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
g.remove_edge("foo", "bar")
assert "bar" not in g["foo"]
assert "foo" not in g.reverse["bar"]
def test_remove_nonexistent_edge_missing_ok(self) -> None:
g = DAG[str]()
g.remove_edge("foo", "bar", missing_ok=True) # should not raise
def test_remove_nonexistent_edge_error(self) -> None:
g = DAG[str]()
with pytest.raises(KeyError):
g.remove_edge("foo", "bar", missing_ok=False)
def test_dagsetview(self) -> None:
g = DAG[str]()
g["foo"] += {"bar", "baz"}
removed = []
g.on_remove = lambda u, v: removed.append(v)
g["foo"] -= {"baz"}
assert set(g["foo"]) == {"bar"}
assert "baz" in removed
class TestDAGDiscardNode:
"""Test node removal."""
def test_discard_node_removes_outgoing_edges(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
g.add_edge("foo", "baz")
g.discard_node("foo")
assert "foo" not in g._succ
assert "foo" not in g.reverse["bar"]
assert "foo" not in g.reverse["baz"]
def test_discard_node_removes_incoming_edges(self) -> None:
g = DAG[str]()
g.add_edge("foo", "baz")
g.add_edge("bar", "baz")
g.discard_node("baz")
assert "baz" not in g._pred
assert "baz" not in g["foo"]
assert "baz" not in g["bar"]
def test_discard_nonexistent_node(self) -> None:
g = DAG[str]()
g.discard_node("z") # should not raise
class TestDAGBasicOps:
class TestSetItem:
"""Test dictionary-style assignment."""
def test_with_set(self) -> None:
g = DAG[str]()
g["foo"] = "qux"
g["foo"] = {"bar", "baz"}
assert set(g["foo"]) == {"bar", "baz", "qux"}
assert "bar" in g["foo"]
assert "baz" in g["foo"]
def test_with_list(self) -> None:
g = DAG[str]()
g["foo"] = ["bar", "baz"]
assert "bar" in g["foo"]
assert "baz" in g["foo"]
def test_with_string(self) -> None:
g = DAG[str]()
g["foo"] = "bar"
assert "bar" in g["foo"]
def test_with_single_item(self) -> None:
g = DAG[int]()
g[1] = 2
assert 2 in g[1]
def test_updates_reverse(self) -> None:
g = DAG[str]()
g["foo"] = {"bar", "baz"}
assert "foo" in g.reverse["bar"]
assert "foo" in g.reverse["baz"]
class TestGetItem:
"""Test dictionary-style access."""
def test_getitem_returns_dagset(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
dagset = g["foo"]
assert "bar" in dagset
def test_getitem_empty_node(self) -> None:
g = DAG[str]()
dagset = g["nonexistent"]
assert len(dagset) == 0
def test_getitem_mutation_updates_graph(self) -> None:
g = DAG[str]()
g["foo"].add("bar")
assert "bar" in g["foo"]
assert "foo" in g.reverse["bar"]
def test_getitem_mutation_triggers_callbacks(self) -> None:
added: list[tuple[str, str]] = []
g = DAG[str]()
g.on_add = lambda u, v: added.append((u, v))
g["foo"].add("bar")
assert ("foo", "bar") in added
class TestDAGDelItem:
"""Test del operation."""
def test_removes_node(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
del g["foo"]
assert "foo" not in g._succ
class TestDAGIterOps:
class TestIter:
"""Test iteration."""
def test_empty(self) -> None:
g = DAG[str]()
assert list(g) == []
def test_returns_nodes(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
g.add_edge("bar", "baz")
assert {"foo", "bar"} == set(g)
class TestLen:
"""Test length (edge count)."""
def test_len_empty(self) -> None:
g = DAG[str]()
assert len(g) == 0
def test_len_counts_edges(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
g.add_edge("foo", "baz")
g.add_edge("bar", "baz")
assert len(g) == 3
def test_len_after_removal(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
g.add_edge("foo", "baz")
g.remove_edge("foo", "bar")
assert len(g) == 1
class TestDAGReverse:
"""Test reverse (predecessor) access."""
def test_reverse_property(self) -> None:
g = DAG[str]()
g.add_edge("foo", "bar")
assert "foo" in g.reverse["bar"]
assert len(g.reverse["foo"]) == 0
def test_reverse_multiple_predecessors(self) -> None:
g = DAG[str]()
g.add_edge("foo", "baz")
g.add_edge("bar", "baz")
preds = g.reverse["baz"]
assert "foo" in preds
assert "bar" in preds
class TestDAGCallbacks:
"""Test callback mechanisms."""
def test_on_add_callback_via_setitem(self) -> None:
added: list[tuple[str, str]] = []
g = DAG[str]()
g.on_add = lambda u, v: added.append((u, v))
g["foo"] = {"bar", "baz"}
# Note: setitem doesn't trigger callbacks in current implementation
def test_on_add_callback_via_getitem_mutation(self) -> None:
added: list[tuple[str, str]] = []
g = DAG[str]()
g.on_add = lambda u, v: added.append((u, v))
g["foo"].add("bar")
assert ("foo", "bar") in added
def test_on_remove_callback(self) -> None:
removed: list[tuple[str, str]] = []
g = DAG[str]()
g.on_remove = lambda u, v: removed.append((u, v))
g["foo"].add("bar")
g["foo"].discard("bar")
assert ("foo", "bar") in removed
class TestDAGComplexScenarios:
"""Test complex usage patterns."""
def test_chain_graph(self) -> None:
g = DAG[str]()
g["foo"] = {"bar"}
g["bar"] = {"baz"}
g["baz"] = {"d"}
assert "bar" in g["foo"]
assert "baz" in g["bar"]
assert "d" in g["baz"]
assert len(g) == 3
def test_diamond_graph(self) -> None:
g = DAG[str]()
g["foo"] = {"bar", "baz"}
g["bar"] = {"d"}
g["baz"] = {"d"}
assert len(g.reverse["d"]) == 2
def test_batch_operations(self) -> None:
g = DAG[str]()
g["foo"] += {"bar", "baz", "d"}
assert len(g["foo"]) == 3
g["foo"] -= {"bar"}
assert len(g["foo"]) == 2
assert "bar" not in g["foo"]
assert set(g["foo"]) == {"baz", "d"}
def test_type_hints_with_ints(self) -> None:
g = DAG[int]()
g[1] = {2, 3}
g[2] = {4}
assert 2 in g[1]
assert 4 in g[2]
assert len(g) == 3
def test_subgraphs(self) -> None:
g = DAG[str]()
g["A"] += "B"
g["B"] += "C"
g["B"] += "D"
g["D"] += "E"
g["E"] += "F"
assert len(g) == 5
sub = g.subgraph("D")
assert len(sub) == 2
assert set(sub["D"]) == {"E"}
assert set(sub["E"]) == {"F"}