fixed session scope
This commit is contained in:
@@ -1,12 +1,64 @@
|
|||||||
|
import asyncio
|
||||||
|
from pathlib import Path
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from appdaemon.appdaemon import AppDaemon
|
||||||
|
from appdaemon.logging import Logging
|
||||||
|
from appdaemon.models.config import AppDaemonConfig
|
||||||
|
from git import Repo
|
||||||
from pytest import Function
|
from pytest import Function
|
||||||
|
|
||||||
|
from .utils import delayed_stop
|
||||||
|
|
||||||
|
|
||||||
def pytest_collection_modifyitems(session, config, items: List[Function]):
|
def pytest_collection_modifyitems(session, config, items: List[Function]):
|
||||||
for i, item in enumerate(items):
|
for i, item in enumerate(items):
|
||||||
if isinstance(item, Function) and "startup" in item.name:
|
if isinstance(item, Function) and 'startup' in item.name:
|
||||||
items.insert(0, items.pop(i))
|
items.insert(0, items.pop(i))
|
||||||
break
|
break
|
||||||
|
|
||||||
return items
|
return items
|
||||||
|
|
||||||
|
|
||||||
|
CONFIG_DIR = Path(__file__).parents[2] / 'conf'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def config_repo() -> Repo:
|
||||||
|
repo = Repo('/home/john/ad-test')
|
||||||
|
return repo
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='session')
|
||||||
|
def base_config() -> AppDaemonConfig:
|
||||||
|
return AppDaemonConfig(
|
||||||
|
latitude=0.0,
|
||||||
|
longitude=0.0,
|
||||||
|
elevation=0,
|
||||||
|
time_zone='America/Chicago',
|
||||||
|
config_dir=CONFIG_DIR,
|
||||||
|
config_file='appdaemon.yaml',
|
||||||
|
# stop_function=lambda: print('Stopping'),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='session')
|
||||||
|
def ad(base_config: AppDaemonConfig):
|
||||||
|
# logging.getLogger('AppDaemon._app_management').setLevel('DEBUG')
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
|
||||||
|
ad = AppDaemon(Logging(), loop, base_config)
|
||||||
|
yield ad
|
||||||
|
ad.stop()
|
||||||
|
|
||||||
|
tasks = asyncio.all_tasks(loop)
|
||||||
|
for t in tasks:
|
||||||
|
t.cancel()
|
||||||
|
|
||||||
|
with pytest.raises(asyncio.exceptions.CancelledError):
|
||||||
|
loop.run_until_complete(asyncio.gather(*tasks))
|
||||||
|
|
||||||
|
loop.close()
|
||||||
|
print('Cleaned up running AD')
|
||||||
|
|||||||
@@ -1,50 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from appdaemon.appdaemon import AppDaemon
|
|
||||||
from appdaemon.logging import Logging
|
|
||||||
from appdaemon.models.config import AppDaemonConfig
|
|
||||||
from git import Repo
|
|
||||||
|
|
||||||
from .utils import delayed_stop
|
|
||||||
|
|
||||||
CONFIG_DIR = Path(__file__).parents[2] / "conf"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def config_repo() -> Repo:
|
|
||||||
repo = Repo("/home/john/ad-test")
|
|
||||||
return repo
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
|
||||||
def base_config() -> AppDaemonConfig:
|
|
||||||
return AppDaemonConfig(
|
|
||||||
latitude=0.0,
|
|
||||||
longitude=0.0,
|
|
||||||
elevation=0,
|
|
||||||
time_zone="America/Chicago",
|
|
||||||
config_dir=CONFIG_DIR,
|
|
||||||
config_file="appdaemon.yaml",
|
|
||||||
# stop_function=lambda: print('Stopping'),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
|
||||||
def ad(base_config: AppDaemonConfig):
|
|
||||||
loop = asyncio.new_event_loop()
|
|
||||||
|
|
||||||
ad = AppDaemon(Logging(), loop, base_config)
|
|
||||||
yield ad
|
|
||||||
ad.stop()
|
|
||||||
|
|
||||||
tasks = asyncio.all_tasks(loop)
|
|
||||||
for t in tasks:
|
|
||||||
t.cancel()
|
|
||||||
|
|
||||||
with pytest.raises(asyncio.exceptions.CancelledError):
|
|
||||||
loop.run_until_complete(asyncio.gather(*tasks))
|
|
||||||
|
|
||||||
loop.close()
|
|
||||||
print("Cleaned up running AD")
|
|
||||||
@@ -8,26 +8,25 @@ import pytest
|
|||||||
from appdaemon.appdaemon import AppDaemon
|
from appdaemon.appdaemon import AppDaemon
|
||||||
from git import Repo
|
from git import Repo
|
||||||
|
|
||||||
from .fixtures import ad, base_config
|
|
||||||
from .utils import get_load_order
|
from .utils import get_load_order
|
||||||
|
|
||||||
|
|
||||||
def reset_file(changed: Path):
|
def reset_file(changed: Path):
|
||||||
for p in Path(__file__).parents:
|
for p in Path(__file__).parents:
|
||||||
if p.with_name(".git").exists():
|
if p.with_name('.git').exists():
|
||||||
root = p.parent
|
root = p.parent
|
||||||
break
|
break
|
||||||
|
|
||||||
repo = Repo(root)
|
repo = Repo(root)
|
||||||
if not changed.is_absolute():
|
if not changed.is_absolute():
|
||||||
changed = root / changed
|
changed = root / changed
|
||||||
repo.git.checkout("HEAD", "--", changed)
|
repo.git.checkout('HEAD', '--', changed)
|
||||||
|
|
||||||
|
|
||||||
def modify_file(path: Path):
|
def modify_file(path: Path):
|
||||||
file_content = path.read_text()
|
file_content = path.read_text()
|
||||||
modified_content = re.sub(r"Ham", r"Spam", file_content, flags=re.MULTILINE)
|
modified_content = re.sub(r'Ham', r'Spam', file_content, flags=re.MULTILINE)
|
||||||
modified_content = re.sub(r"ham", r"spam", modified_content, flags=re.MULTILINE)
|
modified_content = re.sub(r'ham', r'spam', modified_content, flags=re.MULTILINE)
|
||||||
path.write_text(modified_content)
|
path.write_text(modified_content)
|
||||||
|
|
||||||
|
|
||||||
@@ -40,12 +39,12 @@ def test_file(ad: AppDaemon, caplog: pytest.LogCaptureFixture):
|
|||||||
reset_file(module_file)
|
reset_file(module_file)
|
||||||
modify_file(module_file)
|
modify_file(module_file)
|
||||||
|
|
||||||
logging.getLogger("AppDaemon").propagate = True
|
logging.getLogger('AppDaemon').propagate = True
|
||||||
with caplog.at_level(logging.DEBUG, logger="AppDaemon._app_management"):
|
with caplog.at_level(logging.DEBUG, logger='AppDaemon._app_management'):
|
||||||
# running_ad.loop.run_until_complete(delayed_stop(running_ad, 2.0))
|
# running_ad.loop.run_until_complete(delayed_stop(running_ad, 2.0))
|
||||||
print("waiting in test")
|
print('waiting in test')
|
||||||
ad.loop.run_until_complete(asyncio.sleep(2.0))
|
ad.loop.run_until_complete(asyncio.sleep(2.0))
|
||||||
|
|
||||||
module_load_order = get_load_order(caplog)
|
module_load_order = get_load_order(caplog)
|
||||||
assert module_load_order == ["food.meal", "restaurant"]
|
assert module_load_order == ['food.meal', 'restaurant']
|
||||||
print("done")
|
print('done')
|
||||||
|
|||||||
@@ -5,26 +5,25 @@ from typing import List
|
|||||||
import pytest
|
import pytest
|
||||||
from appdaemon.appdaemon import AppDaemon
|
from appdaemon.appdaemon import AppDaemon
|
||||||
|
|
||||||
from .fixtures import ad, base_config
|
|
||||||
from .utils import get_load_order
|
from .utils import get_load_order
|
||||||
|
|
||||||
|
|
||||||
def validate_app_dependencies(ad: AppDaemon):
|
def validate_app_dependencies(ad: AppDaemon):
|
||||||
graph = ad.app_management.app_config.depedency_graph()
|
graph = ad.app_management.app_config.depedency_graph()
|
||||||
assert "hello_world" in graph["food_app"]
|
assert 'hello_world' in graph['food_app']
|
||||||
|
|
||||||
|
|
||||||
def validate_module_dependencies(ad: AppDaemon):
|
def validate_module_dependencies(ad: AppDaemon):
|
||||||
graph = ad.app_management.module_dependencies
|
graph = ad.app_management.module_dependencies
|
||||||
assert "food" in graph["restaurant"]
|
assert 'food' in graph['restaurant']
|
||||||
assert "food.meal" in graph["restaurant"]
|
assert 'food.meal' in graph['restaurant']
|
||||||
|
|
||||||
|
|
||||||
def validate_module_load_order(ad: AppDaemon, module_load_order: List[str]):
|
def validate_module_load_order(ad: AppDaemon, module_load_order: List[str]):
|
||||||
dependency_graph = ad.app_management.module_dependencies
|
dependency_graph = ad.app_management.module_dependencies
|
||||||
for node, deps in dependency_graph.items():
|
for node, deps in dependency_graph.items():
|
||||||
# skip parts that
|
# skip parts that
|
||||||
if not node.startswith("appdaemon"):
|
if not node.startswith('appdaemon'):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
node_idx = module_load_order.index(node)
|
node_idx = module_load_order.index(node)
|
||||||
@@ -37,15 +36,15 @@ def test_startup(ad: AppDaemon, caplog: pytest.LogCaptureFixture):
|
|||||||
assert isinstance(ad, AppDaemon)
|
assert isinstance(ad, AppDaemon)
|
||||||
|
|
||||||
# logger = logging.getLogger('AppDaemon._app_management')
|
# logger = logging.getLogger('AppDaemon._app_management')
|
||||||
ad_logger = logging.getLogger("AppDaemon")
|
ad_logger = logging.getLogger('AppDaemon')
|
||||||
ad_logger.propagate = True
|
ad_logger.propagate = True
|
||||||
|
|
||||||
with caplog.at_level(logging.DEBUG, logger="AppDaemon._app_management"):
|
with caplog.at_level(logging.DEBUG, logger='AppDaemon._app_management'):
|
||||||
# ad_system.loop.run_until_complete(stop_coro)
|
# ad_system.loop.run_until_complete(stop_coro)
|
||||||
ad.loop.run_until_complete(asyncio.sleep(2.0))
|
ad.loop.run_until_complete(asyncio.sleep(2.0))
|
||||||
|
|
||||||
assert "Started 'hello_world'" in caplog.text
|
assert "Started 'hello_world'" in caplog.text
|
||||||
assert "App initialization complete" in caplog.text
|
assert 'App initialization complete' in caplog.text
|
||||||
|
|
||||||
validate_app_dependencies(ad)
|
validate_app_dependencies(ad)
|
||||||
validate_module_dependencies(ad)
|
validate_module_dependencies(ad)
|
||||||
|
|||||||
@@ -12,6 +12,6 @@ async def delayed_stop(ad: AppDaemon, delay: float):
|
|||||||
|
|
||||||
def get_load_order(caplog: pytest.LogCaptureFixture) -> List[str]:
|
def get_load_order(caplog: pytest.LogCaptureFixture) -> List[str]:
|
||||||
for record in caplog.records:
|
for record in caplog.records:
|
||||||
if record.name == "AppDaemon._app_management":
|
if record.name == 'AppDaemon._app_management':
|
||||||
if "Determined module load order" in record.msg:
|
if 'Determined module load order' in record.msg:
|
||||||
return record.args[0]
|
return record.args[0]
|
||||||
|
|||||||
Reference in New Issue
Block a user