308 lines
7.8 KiB
Python
308 lines
7.8 KiB
Python
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from daglib.dag import DAG
|
|
|
|
|
|
class TestDAGInit:
|
|
"""Test DAG initialization."""
|
|
|
|
def test_init_empty(self) -> None:
|
|
g = DAG()
|
|
assert len(g) == 0
|
|
|
|
def test_default_callbacks_none(self) -> None:
|
|
g = DAG()
|
|
assert g.on_add is None
|
|
assert g.on_remove is None
|
|
|
|
|
|
class TestDAGAddEdge:
|
|
"""Test adding edges."""
|
|
|
|
def test_add_single_edge(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
assert "b" in g["a"]
|
|
assert "a" in g.reverse["b"]
|
|
|
|
def test_add_multiple_edges(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
g.add_edge("a", "c")
|
|
g.add_edge("b", "c")
|
|
assert "b" in g["a"]
|
|
assert "c" in g["a"]
|
|
assert "c" in g["b"]
|
|
|
|
def test_add_edge_creates_nodes(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
assert "a" in g._succ
|
|
assert "b" in g._pred
|
|
|
|
def test_self_loop_raises_error(self) -> None:
|
|
g = DAG[str]()
|
|
with pytest.raises(ValueError, match="Self-loops are not allowed"):
|
|
g.add_edge("a", "a")
|
|
|
|
def test_add_duplicate_edge_idempotent(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
g.add_edge("a", "b")
|
|
assert len(g["a"]) == 1
|
|
|
|
|
|
class TestDAGRemoveEdge:
|
|
"""Test removing edges."""
|
|
|
|
def test_remove_existing_edge(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
g.remove_edge("a", "b")
|
|
assert "b" not in g["a"]
|
|
assert "a" not in g.reverse["b"]
|
|
|
|
def test_remove_nonexistent_edge_missing_ok(self) -> None:
|
|
g = DAG[str]()
|
|
g.remove_edge("a", "b", missing_ok=True) # should not raise
|
|
|
|
def test_remove_nonexistent_edge_error(self) -> None:
|
|
g = DAG[str]()
|
|
with pytest.raises(KeyError):
|
|
g.remove_edge("a", "b", missing_ok=False)
|
|
|
|
|
|
class TestDAGDiscardNode:
|
|
"""Test node removal."""
|
|
|
|
def test_discard_node_removes_outgoing_edges(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
g.add_edge("a", "c")
|
|
g.discard_node("a")
|
|
assert "a" not in g._succ
|
|
assert "a" not in g.reverse["b"]
|
|
assert "a" not in g.reverse["c"]
|
|
|
|
def test_discard_node_removes_incoming_edges(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "c")
|
|
g.add_edge("b", "c")
|
|
g.discard_node("c")
|
|
assert "c" not in g._pred
|
|
assert "c" not in g["a"]
|
|
assert "c" not in g["b"]
|
|
|
|
def test_discard_nonexistent_node(self) -> None:
|
|
g = DAG[str]()
|
|
g.discard_node("z") # should not raise
|
|
|
|
|
|
class TestDAGGetItem:
|
|
"""Test dictionary-style access."""
|
|
|
|
def test_getitem_returns_dagset(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
dagset = g["a"]
|
|
assert "b" in dagset
|
|
|
|
def test_getitem_empty_node(self) -> None:
|
|
g = DAG[str]()
|
|
dagset = g["nonexistent"]
|
|
assert len(dagset) == 0
|
|
|
|
def test_getitem_mutation_updates_graph(self) -> None:
|
|
g = DAG[str]()
|
|
g["a"].add("b")
|
|
assert "b" in g["a"]
|
|
assert "a" in g.reverse["b"]
|
|
|
|
def test_getitem_mutation_triggers_callbacks(self) -> None:
|
|
added: list[tuple[str, str]] = []
|
|
g = DAG[str]()
|
|
g.on_add = lambda u, v: added.append((u, v))
|
|
g["a"].add("b")
|
|
assert ("a", "b") in added
|
|
|
|
|
|
class TestDAGSetItem:
|
|
"""Test dictionary-style assignment."""
|
|
|
|
def test_setitem_with_set(self) -> None:
|
|
g = DAG[str]()
|
|
g["a"] = {"b", "c"}
|
|
assert "b" in g["a"]
|
|
assert "c" in g["a"]
|
|
|
|
def test_setitem_with_list(self) -> None:
|
|
g = DAG[str]()
|
|
g["a"] = ["b", "c"]
|
|
assert "b" in g["a"]
|
|
assert "c" in g["a"]
|
|
|
|
def test_setitem_with_string(self) -> None:
|
|
g = DAG[str]()
|
|
g["a"] = "b"
|
|
assert "b" in g["a"]
|
|
|
|
def test_setitem_with_single_item(self) -> None:
|
|
g = DAG[int]()
|
|
g[1] = 2
|
|
assert 2 in g[1]
|
|
|
|
def test_setitem_updates_reverse(self) -> None:
|
|
g = DAG[str]()
|
|
g["a"] = {"b", "c"}
|
|
assert "a" in g.reverse["b"]
|
|
assert "a" in g.reverse["c"]
|
|
|
|
|
|
class TestDAGDelItem:
|
|
"""Test del operation."""
|
|
|
|
def test_delitem_removes_node(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
del g["a"]
|
|
assert "a" not in g._succ
|
|
|
|
|
|
class TestDAGIter:
|
|
"""Test iteration."""
|
|
|
|
def test_iter_empty(self) -> None:
|
|
g = DAG[str]()
|
|
assert list(g) == []
|
|
|
|
def test_iter_returns_nodes(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
g.add_edge("b", "c")
|
|
nodes = set(g)
|
|
assert "a" in nodes
|
|
assert "b" in nodes
|
|
|
|
|
|
class TestDAGLen:
|
|
"""Test length (edge count)."""
|
|
|
|
def test_len_empty(self) -> None:
|
|
g = DAG[str]()
|
|
assert len(g) == 0
|
|
|
|
def test_len_counts_edges(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
g.add_edge("a", "c")
|
|
g.add_edge("b", "c")
|
|
assert len(g) == 3
|
|
|
|
def test_len_after_removal(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
g.add_edge("a", "c")
|
|
g.remove_edge("a", "b")
|
|
assert len(g) == 1
|
|
|
|
|
|
class TestDAGReverse:
|
|
"""Test reverse (predecessor) access."""
|
|
|
|
def test_reverse_property(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
assert "a" in g.reverse["b"]
|
|
assert len(g.reverse["a"]) == 0
|
|
|
|
def test_reverse_multiple_predecessors(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "c")
|
|
g.add_edge("b", "c")
|
|
preds = g.reverse["c"]
|
|
assert "a" in preds
|
|
assert "b" in preds
|
|
|
|
|
|
class TestDAGCallbacks:
|
|
"""Test callback mechanisms."""
|
|
|
|
def test_on_add_callback_via_setitem(self) -> None:
|
|
added: list[tuple[str, str]] = []
|
|
g = DAG[str]()
|
|
g.on_add = lambda u, v: added.append((u, v))
|
|
g["a"] = {"b", "c"}
|
|
# Note: setitem doesn't trigger callbacks in current implementation
|
|
|
|
def test_on_add_callback_via_getitem_mutation(self) -> None:
|
|
added: list[tuple[str, str]] = []
|
|
g = DAG[str]()
|
|
g.on_add = lambda u, v: added.append((u, v))
|
|
g["a"].add("b")
|
|
assert ("a", "b") in added
|
|
|
|
def test_on_remove_callback(self) -> None:
|
|
removed: list[tuple[str, str]] = []
|
|
g = DAG[str]()
|
|
g.on_remove = lambda u, v: removed.append((u, v))
|
|
g["a"].add("b")
|
|
g["a"].discard("b")
|
|
assert ("a", "b") in removed
|
|
|
|
|
|
class TestDAGRepr:
|
|
"""Test string representation."""
|
|
|
|
def test_repr_empty(self) -> None:
|
|
g = DAG[str]()
|
|
r = repr(g)
|
|
assert "DAG" in r
|
|
assert "{}" in r
|
|
|
|
def test_repr_with_edges(self) -> None:
|
|
g = DAG[str]()
|
|
g.add_edge("a", "b")
|
|
r = repr(g)
|
|
assert "DAG" in r
|
|
assert "a" in r
|
|
assert "b" in r
|
|
|
|
|
|
class TestDAGComplexScenarios:
|
|
"""Test complex usage patterns."""
|
|
|
|
def test_chain_graph(self) -> None:
|
|
g = DAG[str]()
|
|
g["a"] = {"b"}
|
|
g["b"] = {"c"}
|
|
g["c"] = {"d"}
|
|
assert "b" in g["a"]
|
|
assert "c" in g["b"]
|
|
assert "d" in g["c"]
|
|
assert len(g) == 3
|
|
|
|
def test_diamond_graph(self) -> None:
|
|
g = DAG[str]()
|
|
g["a"] = {"b", "c"}
|
|
g["b"] = {"d"}
|
|
g["c"] = {"d"}
|
|
assert len(g.reverse["d"]) == 2
|
|
|
|
def test_batch_operations(self) -> None:
|
|
g = DAG[str]()
|
|
g["a"] += {"b", "c", "d"}
|
|
assert len(g["a"]) == 3
|
|
g["a"] -= {"b"}
|
|
assert len(g["a"]) == 2
|
|
assert "b" not in g["a"]
|
|
|
|
def test_type_hints_with_ints(self) -> None:
|
|
g = DAG[int]()
|
|
g[1] = {2, 3}
|
|
g[2] = {4}
|
|
assert 2 in g[1]
|
|
assert 4 in g[2]
|
|
assert len(g) == 3
|