diff --git a/tests/conftest.py b/tests/conftest.py index 3b74c36c..f23d4dbe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,6 +11,12 @@ from server import FileActivityService +# Declare files holding fixtures +pytest_plugins = [ + 'test_editors.commons' +] + + @pytest.fixture def monitored_dir(): """ @@ -149,7 +155,7 @@ def test_container(request, docker_client, monitored_dir, ignored_dir): container.remove() -@pytest.fixture +@pytest.fixture(autouse=True) def fact(request, docker_client, fact_config, server, logs_dir, test_file): """ Run the fact docker container for integration tests. diff --git a/tests/containers/editors/Containerfile b/tests/containers/editors/Containerfile new file mode 100644 index 00000000..47c91b43 --- /dev/null +++ b/tests/containers/editors/Containerfile @@ -0,0 +1,5 @@ +FROM quay.io/fedora/fedora:43 + +RUN dnf install -y \ + neovim \ + vim diff --git a/tests/event.py b/tests/event.py index 389c01e0..7fee2cb8 100644 --- a/tests/event.py +++ b/tests/event.py @@ -1,4 +1,5 @@ import os +from re import Pattern import string from enum import Enum from typing import Any, override @@ -104,6 +105,21 @@ def get_id(line: str, wanted_id: str) -> int | None: container_id=container_id, loginuid=loginuid) + @classmethod + def in_container(cls, + exe_path: str, + args: str, + name: str, + container_id: str): + return Process(pid=None, + uid=0, + gid=0, + loginuid=pow(2, 32)-1, + exe_path=exe_path, + args=args, + name=name, + container_id=container_id) + @property def uid(self) -> int: return self._uid @@ -161,6 +177,13 @@ def __str__(self) -> str: f'loginuid={self.loginuid})') +def cmp_path(p1: str | Pattern[str], p2: str) -> bool: + if isinstance(p1, Pattern): + return bool(p1.match(p2)) + else: + return p1 == p2 + + class Event: """ Represents a file activity event, associating a process with an @@ -170,15 +193,15 @@ class Event: def __init__(self, process: Process, event_type: EventType, - file: str, - host_path: str = '', + file: str | Pattern[str], + host_path: str | Pattern[str] = '', mode: int | None = None, owner_uid: int | None = None, owner_gid: int | None = None,): self._type: EventType = event_type self._process: Process = process - self._file: str = file - self._host_path: str = host_path + self._file: str | Pattern[str] = file + self._host_path: str | Pattern[str] = host_path self._mode: int | None = mode self._owner_uid: int | None = owner_uid self._owner_gid: int | None = owner_gid @@ -192,11 +215,11 @@ def process(self) -> Process: return self._process @property - def file(self) -> str: + def file(self) -> str | Pattern[str]: return self._file @property - def host_path(self) -> str: + def host_path(self) -> str | Pattern[str]: return self._host_path @property @@ -218,21 +241,21 @@ def __eq__(self, other: Any) -> bool: return False if self.event_type == EventType.CREATION: - return self.file == other.creation.activity.path and \ - self.host_path == other.creation.activity.host_path + return cmp_path(self.file, other.creation.activity.path) and \ + cmp_path(self.host_path, other.creation.activity.host_path) elif self.event_type == EventType.OPEN: - return self.file == other.open.activity.path and \ - self.host_path == other.open.activity.host_path + return cmp_path(self.file, other.open.activity.path) and \ + cmp_path(self.host_path, other.open.activity.host_path) elif self.event_type == EventType.UNLINK: - return self.file == other.unlink.activity.path and \ - self.host_path == other.unlink.activity.host_path + return cmp_path(self.file, other.unlink.activity.path) and \ + cmp_path(self.host_path, other.unlink.activity.host_path) elif self.event_type == EventType.PERMISSION: - return self.file == other.permission.activity.path and \ - self.host_path == other.permission.activity.host_path and \ + return cmp_path(self.file, other.permission.activity.path) and \ + cmp_path(self.host_path, other.permission.activity.host_path) and \ self.mode == other.permission.mode elif self.event_type == EventType.OWNERSHIP: - return self.file == other.ownership.activity.path and \ - self.host_path == other.ownership.activity.host_path and \ + return cmp_path(self.file, other.ownership.activity.path) and \ + cmp_path(self.host_path, other.ownership.activity.host_path) and \ self.owner_uid == other.ownership.uid and \ self.owner_gid == other.ownership.gid return False diff --git a/tests/server.py b/tests/server.py index 2dd17b69..d3aa8d15 100644 --- a/tests/server.py +++ b/tests/server.py @@ -81,7 +81,7 @@ def is_running(self): """ return self.running.is_set() - def _wait_events(self, events: list[Event], ignored: list[Event]): + def _wait_events(self, events: list[Event], strict: bool): while self.is_running(): msg = self.get_next() if msg is None: @@ -89,15 +89,14 @@ def _wait_events(self, events: list[Event], ignored: list[Event]): continue print(f'Got event: {msg}') - if msg in ignored: - raise ValueError(f'Caught ignored event: {msg}') - if msg in events: events.remove(msg) if len(events) == 0: break + elif strict: + raise ValueError(f'Encountered unexpected event: {msg}') - def wait_events(self, events: list[Event], ignored: list[Event] = []): + def wait_events(self, events: list[Event], strict: bool = True): """ Continuously checks the server for incoming events until the specified events are found. @@ -105,10 +104,11 @@ def wait_events(self, events: list[Event], ignored: list[Event] = []): Args: server: The server instance to retrieve events from. event (list[Event]): The events to search for. - ignored (list[Event]): List of events that should not happen. + strict (bool): Fail if an unexpected event is detected. Raises: TimeoutError: If the required events are not found in 5 seconds. """ - fs = self.executor.submit(self._wait_events, events, ignored) + print('Waiting for events:', *events, sep='\n') + fs = self.executor.submit(self._wait_events, events, strict) fs.result(timeout=5) diff --git a/tests/test_config_hotreload.py b/tests/test_config_hotreload.py index 4b5d337d..9a875228 100644 --- a/tests/test_config_hotreload.py +++ b/tests/test_config_hotreload.py @@ -100,7 +100,6 @@ def test_output_grpc_address_change(fact, fact_config, monitored_dir, server, al process = Process.from_proc() e = Event(process=process, event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {e}') server.wait_events([e]) @@ -114,7 +113,6 @@ def test_output_grpc_address_change(fact, fact_config, monitored_dir, server, al e = Event(process=process, event_type=EventType.OPEN, file=fut, host_path='') - print(f'Waiting for event on alternate server: {e}') alternate_server.wait_events([e]) @@ -127,10 +125,6 @@ def test_paths(fact, fact_config, monitored_dir, ignored_dir, server): with open(ignored_file, 'w') as f: f.write('This is to be ignored') - ignored_event = Event(process=p, event_type=EventType.CREATION, - file=ignored_file, host_path='') - print(f'Ignoring: {ignored_event}') - # File Under Test fut = os.path.join(monitored_dir, 'test2.txt') with open(fut, 'w') as f: @@ -138,9 +132,8 @@ def test_paths(fact, fact_config, monitored_dir, ignored_dir, server): e = Event(process=p, event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {e}') - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) config, config_file = fact_config config['paths'] = [ignored_dir] @@ -153,17 +146,12 @@ def test_paths(fact, fact_config, monitored_dir, ignored_dir, server): e = Event(process=p, event_type=EventType.OPEN, file=ignored_file, host_path='') - print(f'Waiting for event: {e}') # File Under Test with open(fut, 'w') as f: f.write('This is another ignored event') - ignored_event = Event( - process=p, event_type=EventType.OPEN, file=fut, host_path='') - print(f'Ignoring: {ignored_event}') - - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server): @@ -174,10 +162,6 @@ def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server): with open(ignored_file, 'w') as f: f.write('This is to be ignored') - ignored_event = Event(process=p, event_type=EventType.CREATION, - file=ignored_file, host_path='') - print(f'Ignoring: {ignored_event}') - # File Under Test fut = os.path.join(monitored_dir, 'test2.txt') with open(fut, 'w') as f: @@ -185,9 +169,8 @@ def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server): e = Event(process=p, event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {e}') - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) config, config_file = fact_config config['paths'] = [monitored_dir, ignored_dir] @@ -205,6 +188,5 @@ def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server): file=ignored_file, host_path=''), Event(process=p, event_type=EventType.OPEN, file=fut, host_path='') ] - print(f'Waiting for events: {events}') server.wait_events(events) diff --git a/tests/test_editors/commons.py b/tests/test_editors/commons.py new file mode 100644 index 00000000..150e662d --- /dev/null +++ b/tests/test_editors/commons.py @@ -0,0 +1,49 @@ +import os + +import pytest + + +def get_vi_test_file(dir): + return os.path.join(dir, '4913') + + +@pytest.fixture(scope='session') +def build_editor_image(docker_client): + image, _ = docker_client.images.build( + path='containers/editors', + tag='editors:latest', + dockerfile='Containerfile', + ) + return image + + +def run_editor_container(image, docker_client, ignored_dir): + container = docker_client.containers.run( + image, + detach=True, + tty=True, + name='editors', + volumes={ + ignored_dir: { + 'bind': '/mounted', + 'mode': 'z', + }, + }, + ) + container.exec_run('mkdir /container-dir') + + yield container + + container.kill() + container.remove() + + +@pytest.fixture +def vi_container(docker_client, ignored_dir): + yield from run_editor_container('quay.io/fedora/fedora:43', docker_client, ignored_dir) + + +@pytest.fixture +def editor_container(build_editor_image, docker_client, ignored_dir): + image = build_editor_image.tags[0] + yield from run_editor_container(image, docker_client, ignored_dir) diff --git a/tests/test_editors/test_nvim.py b/tests/test_editors/test_nvim.py new file mode 100644 index 00000000..32175d42 --- /dev/null +++ b/tests/test_editors/test_nvim.py @@ -0,0 +1,139 @@ +from event import Event, EventType, Process + +from test_editors.commons import get_vi_test_file + + +def test_new_file(editor_container, server): + fut = '/mounted/test.txt' + + editor_container.exec_run( + f"nvim {fut} +':normal iThis is a test' -c x") + + process = Process.in_container( + exe_path='/usr/bin/nvim', + args=f'nvim {fut} +:normal iThis is a test -c x', + name='nvim', + container_id=editor_container.id[:12], + ) + events = [ + Event(process=process, event_type=EventType.CREATION, + file=fut, host_path=''), + ] + + server.wait_events(events, strict=True) + + +def test_open_file(editor_container, server): + fut = '/mounted/test.txt' + container_id = editor_container.id[:12] + + # We ensure the file exists before editing. + editor_container.exec_run(f'touch {fut}') + editor_container.exec_run( + f"nvim {fut} +':normal iThis is a test' -c x") + + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=container_id, + ) + nvim = Process.in_container( + exe_path='/usr/bin/nvim', + args=f'nvim {fut} +:normal iThis is a test -c x', + name='nvim', + container_id=container_id, + ) + + vi_test_file = get_vi_test_file('/mounted') + + events = [ + Event(process=touch, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=nvim, event_type=EventType.CREATION, + file=vi_test_file, host_path=''), + Event(process=nvim, event_type=EventType.OWNERSHIP, + file=vi_test_file, host_path='', owner_uid=0, owner_gid=0), + Event(process=nvim, event_type=EventType.UNLINK, + file=vi_test_file, host_path=''), + Event(process=nvim, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=nvim, event_type=EventType.PERMISSION, + file=fut, host_path='', mode=0o100644), + Event(process=nvim, event_type=EventType.UNLINK, + file=f'{fut}~', host_path=''), + ] + + server.wait_events(events, strict=True) + + +def test_new_file_ovfs(editor_container, server): + fut = '/container-dir/test.txt' + + editor_container.exec_run( + f"nvim {fut} +':normal iThis is a test' -c x") + + process = Process.in_container( + exe_path='/usr/bin/nvim', + args=f'nvim {fut} +:normal iThis is a test -c x', + name='nvim', + container_id=editor_container.id[:12], + ) + events = [ + Event(process=process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=process, event_type=EventType.OPEN, + file=fut, host_path=''), + ] + + server.wait_events(events, strict=True) + + +def test_open_file_ovfs(editor_container, server): + fut = '/container-dir/test.txt' + container_id = editor_container.id[:12] + + # We ensure the file exists before editing. + editor_container.exec_run(f'touch {fut}') + editor_container.exec_run( + f"nvim {fut} +':normal iThis is a test' -c x") + + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=container_id, + ) + nvim = Process.in_container( + exe_path='/usr/bin/nvim', + args=f'nvim {fut} +:normal iThis is a test -c x', + name='nvim', + container_id=container_id, + ) + + vi_test_file = get_vi_test_file('/container-dir') + + events = [ + Event(process=touch, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=touch, event_type=EventType.OPEN, + file=fut, host_path=''), + Event(process=nvim, event_type=EventType.CREATION, + file=vi_test_file, host_path=''), + Event(process=nvim, event_type=EventType.OPEN, + file=vi_test_file, host_path=''), + Event(process=nvim, event_type=EventType.OWNERSHIP, + file=vi_test_file, host_path='', owner_uid=0, owner_gid=0), + Event(process=nvim, event_type=EventType.UNLINK, + file=vi_test_file, host_path=''), + Event(process=nvim, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=nvim, event_type=EventType.OPEN, + file=fut, host_path=''), + Event(process=nvim, event_type=EventType.PERMISSION, + file=fut, host_path='', mode=0o100644), + Event(process=nvim, event_type=EventType.UNLINK, + file=f'{fut}~', host_path=''), + ] + + server.wait_events(events, strict=True) diff --git a/tests/test_editors/test_sed.py b/tests/test_editors/test_sed.py new file mode 100644 index 00000000..99accb88 --- /dev/null +++ b/tests/test_editors/test_sed.py @@ -0,0 +1,76 @@ +import re +from event import Event, EventType, Process + + +def test_sed(vi_container, server): + # File Under Test + fut = '/mounted/test.txt' + container_id = vi_container.id[:12] + + vi_container.exec_run(f"sh -c \"echo 'This is a test' > {fut}\"") + vi_container.exec_run(fr"sed -i -e 's/a test/not \0/' {fut}") + + shell = Process.in_container( + exe_path='/usr/bin/bash', + args=f"sh -c echo 'This is a test' > {fut}", + name='sh', + container_id=container_id, + ) + sed = Process.in_container( + exe_path='/usr/bin/sed', + args=fr'sed -i -e s/a test/not \0/ {fut}', + name='sed', + container_id=container_id, + ) + + sed_tmp_file = re.compile(r'\/mounted\/sed[0-9a-zA-Z]{6}') + + events = [ + Event(process=shell, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=sed, event_type=EventType.CREATION, + file=sed_tmp_file, host_path=''), + Event(process=sed, event_type=EventType.OWNERSHIP, + file=sed_tmp_file, host_path='', owner_uid=0, owner_gid=0), + ] + + server.wait_events(events, strict=True) + + +def test_sed_ovfs(vi_container, server): + # File Under Test + fut = '/container-dir/test.txt' + container_id = vi_container.id[:12] + + vi_container.exec_run(f"sh -c \"echo 'This is a test' > {fut}\"") + vi_container.exec_run(fr"sed -i -e 's/a test/not \0/' {fut}") + + shell = Process.in_container( + exe_path='/usr/bin/bash', + args=f"sh -c echo 'This is a test' > {fut}", + name='sh', + container_id=container_id, + ) + sed = Process.in_container( + exe_path='/usr/bin/sed', + args=fr'sed -i -e s/a test/not \0/ {fut}', + name='sed', + container_id=container_id, + ) + + sed_tmp_file = re.compile(r'\/container-dir\/sed[0-9a-zA-Z]{6}') + + events = [ + Event(process=shell, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=shell, event_type=EventType.OPEN, + file=fut, host_path=''), + Event(process=sed, event_type=EventType.CREATION, + file=sed_tmp_file, host_path=''), + Event(process=sed, event_type=EventType.OPEN, + file=sed_tmp_file, host_path=''), + Event(process=sed, event_type=EventType.OWNERSHIP, + file=sed_tmp_file, host_path='', owner_uid=0, owner_gid=0), + ] + + server.wait_events(events, strict=True) diff --git a/tests/test_editors/test_vi.py b/tests/test_editors/test_vi.py new file mode 100644 index 00000000..97dbce38 --- /dev/null +++ b/tests/test_editors/test_vi.py @@ -0,0 +1,214 @@ +from event import Event, EventType, Process +from test_editors.commons import get_vi_test_file + + +def test_new_file(vi_container, server): + fut = '/mounted/test.txt' + swap_file = '/mounted/.test.txt.swp' + swx_file = '/mounted/.test.txt.swx' + exe = '/usr/libexec/vi' + + vi_container.exec_run( + f"vi {fut} +':normal iThis is a test' -c x") + + process = Process.in_container( + exe_path=exe, + args=f'{exe} {fut} +:normal iThis is a test -c x', + name='vi', + container_id=vi_container.id[:12], + ) + + events = [ + Event(process=process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=swx_file, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swx_file, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + ] + + server.wait_events(events, strict=True) + + +def test_new_file_ovfs(vi_container, server): + fut = '/container-dir/test.txt' + swap_file = '/container-dir/.test.txt.swp' + swx_file = '/container-dir/.test.txt.swx' + exe = '/usr/libexec/vi' + + vi_container.exec_run( + f"vi {fut} +':normal iThis is a test' -c x") + + process = Process.in_container( + exe_path=exe, + args=f'{exe} {fut} +:normal iThis is a test -c x', + name='vi', + container_id=vi_container.id[:12], + ) + + events = [ + Event(process=process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.OPEN, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=swx_file, host_path=''), + Event(process=process, event_type=EventType.OPEN, + file=swx_file, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swx_file, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.OPEN, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=process, event_type=EventType.OPEN, + file=fut, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + ] + + server.wait_events(events, strict=True) + + +def test_open_file(vi_container, server): + fut = '/mounted/test.txt' + swap_file = '/mounted/.test.txt.swp' + swx_file = '/mounted/.test.txt.swx' + vi_test_file = get_vi_test_file('/mounted') + exe = '/usr/libexec/vi' + container_id = vi_container.id[:12] + + # We ensure the file exists before editing. + vi_container.exec_run(f'touch {fut}') + vi_container.exec_run( + f"vi {fut} +':normal iThis is a test' -c x") + + touch_process = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=container_id, + ) + vi_process = Process.in_container( + exe_path=exe, + args=f'{exe} {fut} +:normal iThis is a test -c x', + name='vi', + container_id=container_id, + ) + + events = [ + Event(process=touch_process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swx_file, host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swx_file, host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.PERMISSION, + file=swap_file, host_path='', mode=0o644), + Event(process=vi_process, event_type=EventType.CREATION, + file=vi_test_file, host_path=''), + Event(process=vi_process, event_type=EventType.OWNERSHIP, + file=vi_test_file, host_path='', owner_uid=0, owner_gid=0), + Event(process=vi_process, event_type=EventType.UNLINK, + file=vi_test_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=vi_process, event_type=EventType.PERMISSION, + file=fut, host_path='', mode=0o100644), + Event(process=vi_process, event_type=EventType.UNLINK, + file=f'{fut}~', host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + ] + + server.wait_events(events, strict=True) + + +def test_open_file_ovfs(vi_container, server): + fut = '/container-dir/test.txt' + swap_file = '/container-dir/.test.txt.swp' + swx_file = '/container-dir/.test.txt.swx' + vi_test_file = get_vi_test_file('/container-dir') + exe = '/usr/libexec/vi' + container_id = vi_container.id[:12] + + # We ensure the file exists before editing. + vi_container.exec_run(f'touch {fut}') + vi_container.exec_run( + f"vi {fut} +':normal iThis is a test' -c x") + + touch_process = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=container_id, + ) + vi_process = Process.in_container( + exe_path=exe, + args=f'{exe} {fut} +:normal iThis is a test -c x', + name='vi', + container_id=container_id, + ) + + events = [ + Event(process=touch_process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=touch_process, event_type=EventType.OPEN, + file=fut, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.OPEN, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swx_file, host_path=''), + Event(process=vi_process, event_type=EventType.OPEN, + file=swx_file, host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swx_file, host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.OPEN, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.PERMISSION, + file=swap_file, host_path='', mode=0o644), + Event(process=vi_process, event_type=EventType.CREATION, + file=vi_test_file, host_path=''), + Event(process=vi_process, event_type=EventType.OPEN, + file=vi_test_file, host_path=''), + Event(process=vi_process, event_type=EventType.OWNERSHIP, + file=vi_test_file, host_path='', owner_uid=0, owner_gid=0), + Event(process=vi_process, event_type=EventType.UNLINK, + file=vi_test_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=vi_process, event_type=EventType.OPEN, + file=fut, host_path=''), + Event(process=vi_process, event_type=EventType.PERMISSION, + file=fut, host_path='', mode=0o100644), + Event(process=vi_process, event_type=EventType.UNLINK, + file=f'{fut}~', host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + ] + + server.wait_events(events, strict=True) diff --git a/tests/test_editors/test_vim.py b/tests/test_editors/test_vim.py new file mode 100644 index 00000000..8919362b --- /dev/null +++ b/tests/test_editors/test_vim.py @@ -0,0 +1,208 @@ +from event import Event, EventType, Process +from test_editors.commons import get_vi_test_file + + +def test_new_file(editor_container, server): + fut = '/mounted/test.txt' + swap_file = '/mounted/.test.txt.swp' + swx_file = '/mounted/.test.txt.swx' + + editor_container.exec_run( + f"vim {fut} +':normal iThis is a test' -c x") + + process = Process.in_container( + exe_path='/usr/bin/vim', + args=f'vim {fut} +:normal iThis is a test -c x', + name='vim', + container_id=editor_container.id[:12], + ) + events = [ + Event(process=process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=swx_file, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swx_file, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + ] + + server.wait_events(events, strict=True) + + +def test_new_file_ovfs(editor_container, server): + fut = '/container-dir/test.txt' + swap_file = '/container-dir/.test.txt.swp' + swx_file = '/container-dir/.test.txt.swx' + + editor_container.exec_run( + f"vim {fut} +':normal iThis is a test' -c x") + + process = Process.in_container( + exe_path='/usr/bin/vim', + args=f'vim {fut} +:normal iThis is a test -c x', + name='vim', + container_id=editor_container.id[:12], + ) + events = [ + Event(process=process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.OPEN, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=swx_file, host_path=''), + Event(process=process, event_type=EventType.OPEN, + file=swx_file, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swx_file, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.OPEN, + file=swap_file, host_path=''), + Event(process=process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=process, event_type=EventType.OPEN, + file=fut, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + ] + + server.wait_events(events, strict=True) + + +def test_open_file(editor_container, server): + fut = '/mounted/test.txt' + swap_file = '/mounted/.test.txt.swp' + swx_file = '/mounted/.test.txt.swx' + vi_test_file = get_vi_test_file('/mounted') + container_id = editor_container.id[:12] + + # We ensure the file exists before editing. + editor_container.exec_run(f'touch {fut}') + editor_container.exec_run( + f"vim {fut} +':normal iThis is a test' -c x") + + touch_process = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=container_id, + ) + vi_process = Process.in_container( + exe_path='/usr/bin/vim', + args=f'vim {fut} +:normal iThis is a test -c x', + name='vim', + container_id=container_id, + ) + + events = [ + Event(process=touch_process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swx_file, host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swx_file, host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.PERMISSION, + file=swap_file, host_path='', mode=0o644), + Event(process=vi_process, event_type=EventType.CREATION, + file=vi_test_file, host_path=''), + Event(process=vi_process, event_type=EventType.OWNERSHIP, + file=vi_test_file, host_path='', owner_uid=0, owner_gid=0), + Event(process=vi_process, event_type=EventType.UNLINK, + file=vi_test_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=vi_process, event_type=EventType.PERMISSION, + file=fut, host_path='', mode=0o100644), + Event(process=vi_process, event_type=EventType.UNLINK, + file=f'{fut}~', host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + ] + + server.wait_events(events, strict=True) + + +def test_open_file_ovfs(editor_container, server): + fut = '/container-dir/test.txt' + swap_file = '/container-dir/.test.txt.swp' + swx_file = '/container-dir/.test.txt.swx' + vi_test_file = get_vi_test_file('/container-dir') + container_id = editor_container.id[:12] + + # We ensure the file exists before editing. + editor_container.exec_run(f'touch {fut}') + editor_container.exec_run( + f"vim {fut} +':normal iThis is a test' -c x") + + touch_process = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=container_id, + ) + vi_process = Process.in_container( + exe_path='/usr/bin/vim', + args=f'vim {fut} +:normal iThis is a test -c x', + name='vim', + container_id=container_id, + ) + + events = [ + Event(process=touch_process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=touch_process, event_type=EventType.OPEN, + file=fut, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.OPEN, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swx_file, host_path=''), + Event(process=vi_process, event_type=EventType.OPEN, + file=swx_file, host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swx_file, host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.OPEN, + file=swap_file, host_path=''), + Event(process=vi_process, event_type=EventType.PERMISSION, + file=swap_file, host_path='', mode=0o644), + Event(process=vi_process, event_type=EventType.CREATION, + file=vi_test_file, host_path=''), + Event(process=vi_process, event_type=EventType.OPEN, + file=vi_test_file, host_path=''), + Event(process=vi_process, event_type=EventType.OWNERSHIP, + file=vi_test_file, host_path='', owner_uid=0, owner_gid=0), + Event(process=vi_process, event_type=EventType.UNLINK, + file=vi_test_file, host_path=''), + Event(process=vi_process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=vi_process, event_type=EventType.OPEN, + file=fut, host_path=''), + Event(process=vi_process, event_type=EventType.PERMISSION, + file=fut, host_path='', mode=0o100644), + Event(process=vi_process, event_type=EventType.UNLINK, + file=f'{fut}~', host_path=''), + Event(process=vi_process, event_type=EventType.UNLINK, + file=swap_file, host_path=''), + ] + + server.wait_events(events, strict=True) diff --git a/tests/test_file_open.py b/tests/test_file_open.py index c47272c7..cc14b585 100644 --- a/tests/test_file_open.py +++ b/tests/test_file_open.py @@ -1,18 +1,15 @@ import multiprocessing as mp import os -import docker - from event import Event, EventType, Process -def test_open(fact, monitored_dir, server): +def test_open(monitored_dir, server): """ Tests the opening of a file and verifies that the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -23,18 +20,16 @@ def test_open(fact, monitored_dir, server): e = Event(process=Process.from_proc(), event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {e}') server.wait_events([e]) -def test_multiple(fact, monitored_dir, server): +def test_multiple(monitored_dir, server): """ Tests the opening of multiple files and verifies that the corresponding events are captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -46,21 +41,18 @@ def test_multiple(fact, monitored_dir, server): with open(fut, 'w') as f: f.write('This is a test') - e = Event(process=process, event_type=EventType.CREATION, - file=fut, host_path='') - print(f'Waiting for event: {e}') - events.append(e) + events.append( + Event(process=process, event_type=EventType.CREATION, file=fut, host_path='')) server.wait_events(events) -def test_multiple_access(fact, test_file, server): +def test_multiple_access(test_file, server): """ Tests multiple opening of a file and verifies that the corresponding events are captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -69,21 +61,18 @@ def test_multiple_access(fact, test_file, server): with open(test_file, 'a+') as f: f.write('This is a test') - e = Event(process=Process.from_proc(), file=test_file, - host_path=test_file, event_type=EventType.OPEN) - print(f'Waiting for event: {e}') - events.append(e) + events.append(Event(process=Process.from_proc(), file=test_file, + host_path=test_file, event_type=EventType.OPEN)) server.wait_events(events) -def test_ignored(fact, test_file, ignored_dir, server): +def test_ignored(test_file, ignored_dir, server): """ Tests that open events on ignored files are not captured by the server. Args: - fact: Fixture for file activity (only required to be running). test_file: Temporary file for testing. ignored_dir: Temporary directory path that is not monitored by fact. server: The server instance to communicate with. @@ -95,19 +84,14 @@ def test_ignored(fact, test_file, ignored_dir, server): with open(ignored_file, 'w') as f: f.write('This is to be ignored') - ignored_event = Event(process=p, event_type=EventType.CREATION, - file=ignored_file, host_path='') - print(f'Ignoring: {ignored_event}') - # File Under Test with open(test_file, 'w') as f: f.write('This is a test') e = Event(process=p, event_type=EventType.OPEN, file=test_file, host_path=test_file) - print(f'Waiting for event: {e}') - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) def do_test(fut: str, stop_event: mp.Event): @@ -120,13 +104,12 @@ def do_test(fut: str, stop_event: mp.Event): stop_event.wait() -def test_external_process(fact, monitored_dir, server): +def test_external_process(monitored_dir, server): """ Tests the opening of a file by an external process and verifies that the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -140,10 +123,8 @@ def test_external_process(fact, monitored_dir, server): creation = Event(process=p, event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {creation}') write_access = Event( process=p, event_type=EventType.OPEN, file=fut, host_path='') - print(f'Waiting for event: {write_access}') try: server.wait_events([creation, write_access]) @@ -152,21 +133,19 @@ def test_external_process(fact, monitored_dir, server): proc.join(1) -def test_overlay(fact, test_container, server): +def test_overlay(test_container, server): # File Under Test fut = '/container-dir/test.txt' # Create the exec and an equivalent event that it will trigger test_container.exec_run(f'touch {fut}') - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) events = [ Event(process=process, event_type=EventType.CREATION, file=fut, host_path=''), @@ -174,51 +153,42 @@ def test_overlay(fact, test_container, server): file=fut, host_path='') ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_mounted_dir(fact, test_container, ignored_dir, server): +def test_mounted_dir(test_container, ignored_dir, server): # File Under Test fut = '/mounted/test.txt' # Create the exec and an equivalent event that it will trigger test_container.exec_run(f'touch {fut}') - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) event = Event(process=process, event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {event}') server.wait_events([event]) -def test_unmonitored_mounted_dir(fact, test_container, test_file, server): +def test_unmonitored_mounted_dir(test_container, test_file, server): # File Under Test fut = '/unmonitored/test.txt' # Create the exec and an equivalent event that it will trigger test_container.exec_run(f'touch {fut}') - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) event = Event(process=process, event_type=EventType.OPEN, file=fut, host_path=test_file) - print(f'Waiting for event: {event}') server.wait_events([event]) diff --git a/tests/test_misc.py b/tests/test_misc.py index 71c662dd..a18814a4 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -40,7 +40,7 @@ def run_self_deleter(fact, monitored_dir, logs_dir, docker_client, build_self_de container.remove() -def test_d_path_sanitization(fact, monitored_dir, server, run_self_deleter, docker_client): +def test_d_path_sanitization(monitored_dir, server, run_self_deleter, docker_client): """ Ensure the sanitization of paths obtained by calling the bpf_d_path helper don't include the " (deleted)" suffix when the file is @@ -52,16 +52,13 @@ def test_d_path_sanitization(fact, monitored_dir, server, run_self_deleter, dock container = run_self_deleter - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/local/bin/self-deleter', - args=f'self-deleter {fut}', - name='self-deleter', - container_id=container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/local/bin/self-deleter', + args=f'self-deleter {fut}', + name='self-deleter', + container_id=container.id[:12], + ) event = Event(process=process, event_type=EventType.OPEN, file=fut, host_path=host_path) - print(f'Waiting for event: {event}') server.wait_events([event]) diff --git a/tests/test_path_chmod.py b/tests/test_path_chmod.py index 4b62e2c2..a4fd6345 100644 --- a/tests/test_path_chmod.py +++ b/tests/test_path_chmod.py @@ -4,13 +4,12 @@ from event import Event, EventType, Process -def test_chmod(fact, monitored_dir, server): +def test_chmod(monitored_dir, server): """ Tests changing permissions on a file and verifies the corresponding event is captured by the server Args: - fact: Fixture for file activity (only required to be runing). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -22,17 +21,14 @@ def test_chmod(fact, monitored_dir, server): e = Event(process=Process.from_proc(), event_type=EventType.PERMISSION, file=fut, host_path=fut, mode=mode) - print(f'Waiting for event: {e}') - server.wait_events([e]) -def test_multiple(fact, monitored_dir, server): +def test_multiple(monitored_dir, server): """ Tests modifying permissions on multiple files. Args: - fact: Fixture for file activity (only required to be runing). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -56,12 +52,11 @@ def test_multiple(fact, monitored_dir, server): server.wait_events(events) -def test_ignored(fact, test_file, ignored_dir, server): +def test_ignored(test_file, ignored_dir, server): """ Tests that permission events on ignored files are not captured. Args: - fact: Fixture for file activity (only required to be running). test_file: File monitored on the host, mounted to the container. ignored_dir: Temporary directory path that is not monitored by fact. server: The server instance to communicate with. @@ -75,18 +70,13 @@ def test_ignored(fact, test_file, ignored_dir, server): f.write('This is to be ignored') os.chmod(ignored_file, mode) - ignored_event = Event(process=process, event_type=EventType.PERMISSION, - file=ignored_file, host_path='', mode=mode) - print(f'Ignoring: {ignored_event}') - # File Under Test os.chmod(test_file, mode) e = Event(process=process, event_type=EventType.PERMISSION, file=test_file, host_path=test_file, mode=mode) - print(f'Waiting for event: {e}') - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) def do_test(fut: str, mode: int, stop_event: mp.Event): @@ -98,13 +88,12 @@ def do_test(fut: str, mode: int, stop_event: mp.Event): stop_event.wait() -def test_external_process(fact, monitored_dir, server): +def test_external_process(monitored_dir, server): """ Tests permission change of a file by an external process and verifies that the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -116,23 +105,25 @@ def test_external_process(fact, monitored_dir, server): proc.start() process = Process.from_proc(proc.pid) - event = Event(process=process, event_type=EventType.PERMISSION, - file=fut, host_path='', mode=mode) - print(f'Waiting for event: {event}') + events = [ + Event(process=process, event_type=EventType.CREATION, + file=fut, host_path='', mode=mode), + Event(process=process, event_type=EventType.PERMISSION, + file=fut, host_path='', mode=mode), + ] try: - server.wait_events([event]) + server.wait_events(events) finally: stop_event.set() proc.join(1) -def test_overlay(fact, test_container, server): +def test_overlay(test_container, server): """ Test permission changes on an overlayfs file (inside a container) Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. server: The server instance to communicate with. """ @@ -144,23 +135,18 @@ def test_overlay(fact, test_container, server): test_container.exec_run(f'touch {fut}') test_container.exec_run(f'chmod {mode} {fut}') - loginuid = pow(2, 32)-1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - chmod = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chmod', - args=f'chmod {mode} {fut}', - name='chmod', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) + chmod = Process.in_container( + exe_path='/usr/bin/chmod', + args=f'chmod {mode} {fut}', + name='chmod', + container_id=test_container.id[:12], + ) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), @@ -170,18 +156,14 @@ def test_overlay(fact, test_container, server): file=fut, host_path='', mode=int(mode, 8)), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_mounted_dir(fact, test_container, ignored_dir, server): +def test_mounted_dir(test_container, ignored_dir, server): """ Test permission changes on a file bind mounted into a container Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. ignored_dir: This directory is ignored on the host, and mounted to the container. server: The server instance to communicate with. @@ -194,23 +176,18 @@ def test_mounted_dir(fact, test_container, ignored_dir, server): test_container.exec_run(f'touch {fut}') test_container.exec_run(f'chmod {mode} {fut}') - loginuid = pow(2, 32)-1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - chmod = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chmod', - args=f'chmod {mode} {fut}', - name='chmod', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) + chmod = Process.in_container( + exe_path='/usr/bin/chmod', + args=f'chmod {mode} {fut}', + name='chmod', + container_id=test_container.id[:12], + ) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), @@ -218,19 +195,15 @@ def test_mounted_dir(fact, test_container, ignored_dir, server): host_path='', mode=int(mode, 8)), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_unmonitored_mounted_dir(fact, test_container, test_file, server): +def test_unmonitored_mounted_dir(test_container, test_file, server): """ Test permission changes on a file bind mounted to a container and monitored on the host. Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. test_file: File monitored on the host, mounted to the container. server: The server instance to communicate with. @@ -244,16 +217,13 @@ def test_unmonitored_mounted_dir(fact, test_container, test_file, server): # Create the exec and an equivalent event that it will trigger test_container.exec_run(f'chmod {mode} {fut}') - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chmod', - args=f'chmod {mode} {fut}', - name='chmod', - container_id=test_container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/bin/chmod', + args=f'chmod {mode} {fut}', + name='chmod', + container_id=test_container.id[:12], + ) event = Event(process=process, event_type=EventType.PERMISSION, file=fut, host_path=test_file, mode=int(mode, 8)) - print(f'Waiting for event: {event}') server.wait_events([event]) diff --git a/tests/test_path_chown.py b/tests/test_path_chown.py index d318f4eb..a1ade331 100644 --- a/tests/test_path_chown.py +++ b/tests/test_path_chown.py @@ -10,13 +10,12 @@ TEST_GID = 2345 -def test_chown(fact, test_container, server): +def test_chown(test_container, server): """ Execute a chown operation on a file and verifies the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. server: The server instance to communicate with. """ @@ -29,48 +28,40 @@ def test_chown(fact, test_container, server): test_container.exec_run(touch_cmd) test_container.exec_run(chown_cmd) - loginuid = pow(2, 32) - 1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=touch_cmd, - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - chown = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=touch_cmd, + name='touch', + container_id=test_container.id[:12], + ) + chown = Process.in_container( + exe_path='/usr/bin/chown', + args=chown_cmd, + name='chown', + container_id=test_container.id[:12], + ) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), + Event(process=touch, event_type=EventType.OPEN, file=fut, + host_path=''), Event(process=chown, event_type=EventType.OWNERSHIP, file=fut, host_path='', owner_uid=TEST_UID, owner_gid=TEST_GID), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_multiple(fact, test_container, server): +def test_multiple(test_container, server): """ Tests ownership operations on multiple files and verifies the corresponding events are captured by the server. Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. server: The server instance to communicate with. """ events = [] - loginuid = pow(2, 32) - 1 # File Under Test for i in range(3): @@ -80,48 +71,40 @@ def test_multiple(fact, test_container, server): test_container.exec_run(touch_cmd) test_container.exec_run(chown_cmd) - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=touch_cmd, - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - chown = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=touch_cmd, + name='touch', + container_id=test_container.id[:12], + ) + chown = Process.in_container( + exe_path='/usr/bin/chown', + args=chown_cmd, + name='chown', + container_id=test_container.id[:12], + ) events.extend([ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), + Event(process=touch, event_type=EventType.OPEN, file=fut, + host_path=''), Event(process=chown, event_type=EventType.OWNERSHIP, file=fut, host_path='', owner_uid=TEST_UID, owner_gid=TEST_GID), ]) - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_ignored(fact, test_container, server): +def test_ignored(test_container, server): """ Tests that ownership events on ignored files are not captured by the server. Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. server: The server instance to communicate with. """ - loginuid = pow(2, 32) - 1 - ignored_file = '/test.txt' monitored_file = '/container-dir/test.txt' @@ -135,78 +118,35 @@ def test_ignored(fact, test_container, server): test_container.exec_run(monitored_touch_cmd) test_container.exec_run(monitored_chown_cmd) - ignored_touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=ignored_touch_cmd, - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - ignored_chown = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=ignored_chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid) - reported_touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=monitored_touch_cmd, - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - reported_chown = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=monitored_chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid) - ignored_events = [ - Event(process=ignored_touch, - event_type=EventType.CREATION, - file=ignored_file, - host_path=''), - Event(process=ignored_chown, - event_type=EventType.OWNERSHIP, - file=ignored_file, - host_path='', - owner_uid=TEST_UID, - owner_gid=TEST_GID), - ] - expected_events = [ - Event(process=reported_touch, - event_type=EventType.CREATION, - file=monitored_file, - host_path=''), - Event(process=reported_chown, - event_type=EventType.OWNERSHIP, - file=monitored_file, - host_path='', - owner_uid=TEST_UID, - owner_gid=TEST_GID), + reported_touch = Process.in_container( + exe_path='/usr/bin/touch', + args=monitored_touch_cmd, + name='touch', + container_id=test_container.id[:12], + ) + reported_chown = Process.in_container( + exe_path='/usr/bin/chown', + args=monitored_chown_cmd, + name='chown', + container_id=test_container.id[:12], + ) + events = [ + Event(process=reported_touch, event_type=EventType.CREATION, + file=monitored_file, host_path=''), + Event(process=reported_touch, event_type=EventType.OPEN, + file=monitored_file, host_path=''), + Event(process=reported_chown, event_type=EventType.OWNERSHIP, + file=monitored_file, host_path='', owner_uid=TEST_UID, owner_gid=TEST_GID), ] - for e in ignored_events: - print(f'Events that should be ignored: {e}') - - for e in expected_events: - print(f'Waiting for event: {e}') + server.wait_events(events=events) - server.wait_events(events=expected_events, ignored=ignored_events) - -def test_no_change(fact, test_container, server): +def test_no_change(test_container, server): """ Tests that chown to the same UID/GID triggers events for all calls. Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. server: The server instance to communicate with. """ @@ -225,44 +165,29 @@ def test_no_change(fact, test_container, server): # Second chown to the same UID/GID - this should ALSO trigger an event test_container.exec_run(chown_cmd) - loginuid = pow(2, 32) - 1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=touch_cmd, - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - chown = [ - Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid), - Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid) - ] - + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=touch_cmd, + name='touch', + container_id=test_container.id[:12], + ) + chown = Process.in_container( + exe_path='/usr/bin/chown', + args=chown_cmd, + name='chown', + container_id=test_container.id[:12], + ) + chown_event = Event(process=chown, event_type=EventType.OWNERSHIP, + file=fut, host_path='', owner_uid=TEST_UID, owner_gid=TEST_GID) + # Expect both chown events (all calls to chown trigger events) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), - *(Event(process=p, event_type=EventType.OWNERSHIP, file=fut, - host_path='', owner_uid=TEST_UID, owner_gid=TEST_GID) for p in chown), + Event(process=touch, event_type=EventType.OPEN, file=fut, + host_path=''), + chown_event, + chown_event, ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) - diff --git a/tests/test_path_unlink.py b/tests/test_path_unlink.py index 3a7cde5b..08f46228 100644 --- a/tests/test_path_unlink.py +++ b/tests/test_path_unlink.py @@ -6,13 +6,12 @@ from event import Event, EventType, Process -def test_remove(fact, test_file, server): +def test_remove(test_file, server): """ Tests the removal of a file and verifies the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). test_file: Temporary file for testing. server: The server instance to communicate with. """ @@ -27,13 +26,12 @@ def test_remove(fact, test_file, server): server.wait_events(events) -def test_multiple(fact, monitored_dir, server): +def test_multiple(monitored_dir, server): """ Tests the removal of multiple files and verifies the corresponding events are captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for monitoring the test file. server: The server instance to communicate with. """ @@ -57,13 +55,12 @@ def test_multiple(fact, monitored_dir, server): server.wait_events(events) -def test_ignored(fact, test_file, ignored_dir, server): +def test_ignored(test_file, ignored_dir, server): """ Tests that unlink events on ignored files are not captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. ignored_dir: Temporary directory path that is not monitored by fact. server: The server instance to communicate with. @@ -76,18 +73,13 @@ def test_ignored(fact, test_file, ignored_dir, server): f.write('This is to be ignored') os.remove(ignored_file) - ignored_event = Event(process=process, event_type=EventType.UNLINK, - file=ignored_file, host_path='') - print(f'Ignoring: {ignored_event}') - # File Under Test os.remove(test_file) e = Event(process=process, event_type=EventType.UNLINK, file=test_file, host_path=test_file) - print(f'Waiting for event: {e}') - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) def do_test(fut: str, stop_event: mp.Event): @@ -99,13 +91,12 @@ def do_test(fut: str, stop_event: mp.Event): stop_event.wait() -def test_external_process(fact, monitored_dir, server): +def test_external_process(monitored_dir, server): """ Tests the removal of a file by an external process and verifies that the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -116,18 +107,21 @@ def test_external_process(fact, monitored_dir, server): proc.start() process = Process.from_proc(proc.pid) - removal = Event(process=process, event_type=EventType.UNLINK, - file=fut, host_path='') - print(f'Waiting for event: {removal}') + events = [ + Event(process=process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=fut, host_path=''), + ] try: - server.wait_events([removal]) + server.wait_events(events) finally: stop_event.set() proc.join(1) -def test_overlay(fact, test_container, server): +def test_overlay(test_container, server): # File Under Test fut = '/container-dir/test.txt' @@ -135,23 +129,18 @@ def test_overlay(fact, test_container, server): test_container.exec_run(f'touch {fut}') test_container.exec_run(f'rm {fut}') - loginuid = pow(2, 32)-1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - rm = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/rm', - args=f'rm {fut}', - name='rm', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) + rm = Process.in_container( + exe_path='/usr/bin/rm', + args=f'rm {fut}', + name='rm', + container_id=test_container.id[:12], + ) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), @@ -161,13 +150,10 @@ def test_overlay(fact, test_container, server): file=fut, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_mounted_dir(fact, test_container, ignored_dir, server): +def test_mounted_dir(test_container, ignored_dir, server): # File Under Test fut = '/mounted/test.txt' @@ -175,23 +161,18 @@ def test_mounted_dir(fact, test_container, ignored_dir, server): test_container.exec_run(f'touch {fut}') test_container.exec_run(f'rm {fut}') - loginuid = pow(2, 32)-1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - rm = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/rm', - args=f'rm {fut}', - name='rm', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) + rm = Process.in_container( + exe_path='/usr/bin/rm', + args=f'rm {fut}', + name='rm', + container_id=test_container.id[:12], + ) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), @@ -199,29 +180,23 @@ def test_mounted_dir(fact, test_container, ignored_dir, server): host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_unmonitored_mounted_dir(fact, test_container, test_file, server): +def test_unmonitored_mounted_dir(test_container, test_file, server): # File Under Test fut = '/unmonitored/test.txt' # Create the exec and an equivalent event that it will trigger test_container.exec_run(f'rm {fut}') - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/rm', - args=f'rm {fut}', - name='rm', - container_id=test_container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/bin/rm', + args=f'rm {fut}', + name='rm', + container_id=test_container.id[:12], + ) event = Event(process=process, event_type=EventType.UNLINK, file=fut, host_path=test_file) - print(f'Waiting for event: {event}') server.wait_events([event])