diff --git a/.gitignore b/.gitignore index dd5939d9a..d2322d58a 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ docs/.buildinfo/ src/bin src/web_interface/static/node_modules src/web_interface/static/file_icons +FACT_export # pytest .pytest_cache diff --git a/README.md b/README.md index 431ab941c..48b11022d 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,22 @@ FACT provides an optional basic authentication, role and user management. More i * [FACT Search and Download](https://github.com/fkie-cad/FACT_Search_and_Download) * [PDF Report Generator](https://github.com/fkie-cad/fact_pdf_report) +### Import/Export of Results + +The script `src/firmware_import_export.py` can be used to export unpacked files and analysis results and import them +into another FACT instance. The data is stored as a ZIP archive and this is also the format the scripts expects during +import. To export files and analysis data of analyzed firmware images simply run + +```shell +python3 firmware_import_export.py export FW_UID [FW_UID_2 ...] [-o OUTPUT_DIR] +``` + +After this, you can import the exported files with + +```shell +python3 firmware_import_export.py import FW.zip [FW_2.zip ...] +``` + ## Vagrant We provide monthly and ready-to-use vagrant boxes of our master branch. [Vagrant](https://www.vagrantup.com/) is an easy and convenient way to get started with FACT without having to install it on your machine. Just setup vagrant and import our provided box into VirtualBox. Our boxes can be found [here](https://app.vagrantup.com/fact-cad/boxes/FACT-master)! diff --git a/src/firmware_import_export.py b/src/firmware_import_export.py new file mode 100755 index 000000000..8200a5c4c --- /dev/null +++ b/src/firmware_import_export.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import logging +import sys +from io import BytesIO +from pathlib import Path +from zipfile import ZIP_DEFLATED, BadZipFile, ZipFile + +from rich.logging import RichHandler +from rich.progress import MofNCompleteColumn, Progress, SpinnerColumn, TimeElapsedColumn + +from config import load +from helperFunctions.database import get_shared_session +from objects.file import FileObject +from objects.firmware import Firmware +from storage.db_interface_backend import BackendDbInterface +from storage.fsorganizer import FSOrganizer +from storage.migration import get_current_revision + +load() +logging.basicConfig(level='NOTSET', format='%(message)s', datefmt='[%X]', handlers=[RichHandler(rich_tracebacks=True)]) +logger = logging.getLogger('rich') +COLUMNS = [SpinnerColumn(), *Progress.get_default_columns(), TimeElapsedColumn(), MofNCompleteColumn()] +EXPECTED_KEYS = ['db_revision', 'files', 'firmware', 'uid'] +ERROR_MESSAGE = ( + 'The import feature only works with archives exported by FACT and ' + 'is not intended to be used to import arbitrary firmware!' +) + + +class FwExporter: + def __init__(self, output_dir: str): + self.target_dir = Path(output_dir) + self.target_dir.mkdir(exist_ok=True) + self.db_interface = BackendDbInterface() + self.fs_organizer = FSOrganizer() + + def export_files(self, uid_list: list[str]): + with get_shared_session(self.db_interface) as db_session, Progress(*COLUMNS) as progress: + export_task = progress.add_task('Firmware export', total=len(uid_list)) + for uid in uid_list: + self._export_single_file(db_session, uid, progress) + progress.advance(export_task) + + def _export_single_file(self, db, fw_uid: str, progress: Progress): + included_files = db.get_all_files_in_fw(fw_uid) + with BytesIO() as buffer: + with ZipFile(buffer, 'w', ZIP_DEFLATED) as zip_file: + file_task = progress.add_task('Fetching files', total=len(included_files) + 1) + for fo_uid in included_files.union({fw_uid}): + file_path = self.fs_organizer.generate_path_from_uid(fo_uid) + zip_file.writestr(f'files/{fo_uid}', Path(file_path).read_bytes()) + progress.advance(file_task) + progress.remove_task(file_task) + zip_file.writestr( + 'data.json', + json.dumps(self._fetch_db_data(fw_uid, included_files, db, progress)), + ) + target_path = self.target_dir / f'FACT_export_{fw_uid}.zip' + target_path.write_bytes(buffer.getvalue()) + logger.info(f'Exported firmware {fw_uid} to {target_path}') + + @staticmethod + def _fetch_db_data(uid: str, all_files: set[str], db, progress: Progress) -> dict: + db_data = { + 'db_revision': get_current_revision(), + 'files': [], + 'firmware': db.get_object(uid).to_json(), + 'uid': uid, + } + db_task = progress.add_task('Fetching DB entries', total=len(all_files)) + for fo in db.get_objects_by_uid_list(all_files): + db_data['files'].append(fo.to_json(vfp_parent_filter=all_files.union({uid}))) + progress.advance(db_task) + progress.remove_task(db_task) + return db_data + + +class FwImporter: + def __init__(self, force: bool): + self.db_interface = BackendDbInterface() + self.fs_organizer = FSOrganizer() + self.force = force + self.progress: Progress | None = None + + def import_files(self, file_list: list[str]): + with Progress(*COLUMNS) as progress: + self.progress = progress + import_task = progress.add_task('Importing files', total=len(file_list)) + for file in file_list: + path = Path(file) + if not path.is_file(): + logging.error(f'File {path} does not exist') + if self._import_file(path): + progress.advance(import_task) + self.progress = None + + def _import_file(self, path: Path) -> bool: # noqa: PLR0911 + try: + with ZipFile(path, 'r') as zip_file: + if 'data.json' not in zip_file.namelist(): + logging.error(f'Error: data.json not found in uploaded import file. {ERROR_MESSAGE}') + return False + try: + data = json.loads(zip_file.read('data.json')) + except json.JSONDecodeError as error: + logging.error(f'Error: data.json is not a valid JSON file: {error}') + return False + if not all(k in data for k in EXPECTED_KEYS): + logging.error(f'Error: data.json is missing mandatory keys (expected: {EXPECTED_KEYS}') + return False + if self.db_interface.is_firmware(data['uid']): + logging.warning(f'Skipping firmware {data["uid"]}. Reason: is already in the DB') + return False + current_revision = get_current_revision() + if not self.force and data['db_revision'] != current_revision: + logging.error( + f'Error: import file was created with a different DB revision: ' + f'{data["db_revision"]} (current revision is {current_revision}). ' + f'Please upgrade/downgrade to a compatible revision.', + ) + return False + + imported_objects = self._import_objects(data) + imported_files = self._import_files(zip_file) + logging.info( + f'Successfully imported {imported_files} files and {imported_objects} DB entries from {path}' + ) + return True + except BadZipFile: + logging.error(f'Error: File {path} is not a ZIP file. {ERROR_MESSAGE}') + return False + + def _import_files(self, zip_file) -> int: + files = [f for f in zip_file.namelist() if f != 'data.json'] + file_task = self.progress.add_task('Importing files', total=len(files)) + for file in files: + self.fs_organizer.store_file(FileObject(binary=zip_file.read(file))) + self.progress.advance(file_task) + self.progress.remove_task(file_task) + return len(files) + + def _import_objects(self, data: dict) -> int: + firmware = Firmware.from_json(data['firmware']) + file_objects = {fo_data['uid']: FileObject.from_json(fo_data, firmware.uid) for fo_data in data['files']} + with get_shared_session(self.db_interface) as db_session: + db_session.add_object(firmware) + return self._insert_objects_hierarchically(file_objects, firmware.uid, db_session) + + def _insert_objects_hierarchically(self, fo_dict: dict[str, FileObject], root_uid: str, db) -> int: + already_added = {root_uid} + all_uids = already_added.union(fo_dict) + orphans = {uid for uid, fo in fo_dict.items() if any(parent not in all_uids for parent in fo.parents)} + for uid in orphans: + fo_dict.pop(uid) + logging.warning(f'FW import contains orphaned object {uid} (ignored)') + db_task = self.progress.add_task('Importing DB entries', total=len(fo_dict)) + while fo_dict: + addable_uids = set() + for fo in fo_dict.values(): + if all(parent in already_added for parent in fo.parents): + addable_uids.add(fo.uid) + for uid in addable_uids: + db.add_object(fo_dict.pop(uid)) + already_added.add(uid) + self.progress.advance(db_task) + self.progress.remove_task(db_task) + return len(already_added) + + +def _parse_args(args=None): + if args is None: + args = sys.argv[1:] + parser = argparse.ArgumentParser(description='Script to import and export firmware analyses') + subparsers = parser.add_subparsers( + title='subcommands', + description='valid subcommands', + help='additional help', + required=True, + dest='command', + ) + + parser_export = subparsers.add_parser('export') + parser_export.add_argument('uid_list', nargs='+', help='The UIDs of the firmware(s) to export') + parser_export.add_argument( + '-o', '--output', help='The output directory (default: (cwd)/FACT_export)', type=str, default='FACT_export' + ) + + parser_import = subparsers.add_parser('import') + parser_import.add_argument('files', nargs='+', help='The FACT export archive(s) to import') + parser_import.add_argument('-f', '--force', action='store_true', help='ignore DB revision check') + return parser.parse_args(args) + + +def main(): + args = _parse_args() + if args.command == 'export': + FwExporter(args.output).export_files(args.uid_list) + else: + FwImporter(args.force).import_files(args.files) + + +if __name__ == '__main__': + main() diff --git a/src/helperFunctions/virtual_file_path.py b/src/helperFunctions/virtual_file_path.py index f9d553e5b..fa768f273 100644 --- a/src/helperFunctions/virtual_file_path.py +++ b/src/helperFunctions/virtual_file_path.py @@ -16,3 +16,12 @@ def get_some_vfp(vfp_dict: dict[str, list[str]]) -> str | None: for vfp_list in vfp_dict.values(): return vfp_list[0] return None + + +def filter_vpf_dict(vfp_dict: dict[str, list[str]], parent_uids: set[str]) -> dict[str, list[str]]: + """ + Get only VFPs from parent files that are contained in `parent_uids`. + :param vfp_dict: A virtual file path dict + :param parent_uids: A set of allowed parent UIDs (VFPs from other parent files are filtered out) + """ + return {k: v for k, v in vfp_dict.items() if k in parent_uids} diff --git a/src/objects/file.py b/src/objects/file.py index eefb51131..789f90c29 100644 --- a/src/objects/file.py +++ b/src/objects/file.py @@ -9,7 +9,7 @@ from helperFunctions.data_conversion import make_bytes, make_unicode_string from helperFunctions.hash import get_sha256 from helperFunctions.uid import create_uid -from helperFunctions.virtual_file_path import get_some_vfp +from helperFunctions.virtual_file_path import filter_vpf_dict, get_some_vfp class FileObject: @@ -213,3 +213,39 @@ def __str__(self) -> str: def __repr__(self) -> str: return self.__str__() + + def to_json(self, vfp_parent_filter: set[str] | None = None) -> dict: + """ + Get a FileObject as JSON. `vfp_parent_filter` can be used to filter the entries with a UID whitelist. + """ + return { + 'comments': self.comments, + 'depth': self.depth, + 'file_name': self.file_name, + 'files_included': list(self.files_included), + 'processed_analysis': self.processed_analysis, + 'sha256': self.sha256, + 'size': self.size, + 'uid': self.uid, + 'virtual_file_path': ( + filter_vpf_dict(self.virtual_file_path, vfp_parent_filter) + if vfp_parent_filter is not None + else self.virtual_file_path + ), + } + + @classmethod + def from_json(cls, json_dict: dict, root_uid: str | None = None) -> FileObject: + fo = cls(file_name=json_dict['file_name']) + fo.comments = json_dict.get('comments') + fo.depth = json_dict.get('depth') + fo.files_included = json_dict.get('files_included') + fo.processed_analysis = json_dict.get('processed_analysis') + fo.sha256 = json_dict.get('sha256') or json_dict.get('uid').split('_')[0] + fo.size = json_dict.get('size') + fo.uid = json_dict.get('uid') + fo.virtual_file_path = json_dict.get('virtual_file_path') + # these entries are necessary for correctly filling the included_files_table and fw_files_table + fo.parent_firmware_uids = [root_uid] if root_uid else [] + fo.parents = list(fo.virtual_file_path) + return fo diff --git a/src/objects/firmware.py b/src/objects/firmware.py index 3a8661337..09ea57553 100644 --- a/src/objects/firmware.py +++ b/src/objects/firmware.py @@ -142,3 +142,30 @@ def __str__(self) -> str: def __repr__(self) -> str: return self.__str__() + + def to_json(self, vfp_parent_filter: set[str] | None = None) -> dict: + json = super().to_json(vfp_parent_filter) + json.update( + { + 'device_class': self.device_class, + 'device_name': self.device_name, + 'part': self.part, + 'release_date': self.release_date, + 'tags': self.tags, + 'vendor': self.vendor, + 'version': self.version, + } + ) + return json + + @classmethod + def from_json(cls, json: dict, root_uid: str | None = None): + instance = super().from_json(json, root_uid) + instance.device_class = json.get('device_class') + instance.device_name = json.get('device_name') + instance.part = json.get('part') + instance.release_date = json.get('release_date') + instance.tags = json.get('tags') + instance.vendor = json.get('vendor') + instance.version = json.get('version') + return instance diff --git a/src/storage/migration/__init__.py b/src/storage/migration/__init__.py index b018461a6..50cefca11 100644 --- a/src/storage/migration/__init__.py +++ b/src/storage/migration/__init__.py @@ -18,15 +18,25 @@ def alembic_table_exists(): return inspect(connection).has_table('alembic_version', None) -def db_needs_migration(): +def get_current_revision(): # alembic must be executed from src for paths to line up with OperateInDirectory(get_src_dir()), AdminConnection().engine.connect().engine.begin() as connection: logging.getLogger('alembic.runtime.migration').setLevel(logging.WARNING) # hide alembic log messages context = migration.MigrationContext.configure(connection) - current_revision = context.get_current_revision() - current_head = script.ScriptDirectory.from_config(ALEMBIC_CFG).get_current_head() - logging.info(f'Alembic DB revision: head: {current_head}, current: {current_revision}') - return current_revision != current_head + return context.get_current_revision() + + +def _get_current_head(): + # alembic must be executed from src for paths to line up + with OperateInDirectory(get_src_dir()): + return script.ScriptDirectory.from_config(ALEMBIC_CFG).get_current_head() + + +def db_needs_migration(): + current_revision = get_current_revision() + current_head = _get_current_head() + logging.info(f'Alembic DB revision: head: {current_head}, current: {current_revision}') + return current_revision != current_head def create_alembic_table(): diff --git a/src/test/integration/run_scripts/__init__.py b/src/test/integration/run_scripts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/test/integration/run_scripts/test_import_export.py b/src/test/integration/run_scripts/test_import_export.py new file mode 100644 index 000000000..d82bfd175 --- /dev/null +++ b/src/test/integration/run_scripts/test_import_export.py @@ -0,0 +1,40 @@ +from pathlib import Path +from tempfile import TemporaryDirectory +from zipfile import ZipFile, is_zipfile + +from firmware_import_export import FwExporter, FwImporter + +from test.integration.storage.helper import create_fw_with_child_fo + + +def test_import_export(backend_db, admin_db, fsorganizer): + fo, fw = create_fw_with_child_fo() + backend_db.insert_multiple_objects(fw, fo) + fsorganizer.store_file(fw) + fsorganizer.store_file(fo) + assert backend_db.is_firmware(fw.uid) + + with TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) + exporter = FwExporter(tmpdir) + exporter.export_files([fw.uid]) + files = list(tmp_path.iterdir()) + assert len(files) == 1 + assert files[0].name == f'FACT_export_{fw.uid}.zip' + assert is_zipfile(files[0]) + with ZipFile(files[0], 'r') as zip_file: + assert sorted(zip_file.namelist()) == ['data.json', f'files/{fw.uid}', f'files/{fo.uid}'] + + admin_db.delete_firmware(fw.uid) + assert backend_db.is_firmware(fw.uid) is False + importer = FwImporter(force=False) + importer.import_files(files) + + assert backend_db.is_firmware(fw.uid) + assert backend_db.exists(fo.uid) + imported_fw = backend_db.get_object(fw.uid) + for attribute in ['device_name', 'vendor', 'version', 'size', 'file_name']: + assert getattr(imported_fw, attribute) == getattr(fw, attribute) + for key in fw.processed_analysis['dummy']: + assert key in imported_fw.processed_analysis['dummy'] + assert imported_fw.processed_analysis['dummy'][key] == fw.processed_analysis['dummy'][key] diff --git a/src/test/unit/helperFunctions/test_virtual_file_path.py b/src/test/unit/helperFunctions/test_virtual_file_path.py index 5a912cd93..4d0ca4e4e 100644 --- a/src/test/unit/helperFunctions/test_virtual_file_path.py +++ b/src/test/unit/helperFunctions/test_virtual_file_path.py @@ -1,6 +1,6 @@ import pytest -from helperFunctions.virtual_file_path import get_paths_for_all_parents +from helperFunctions.virtual_file_path import filter_vpf_dict, get_paths_for_all_parents @pytest.mark.parametrize( @@ -15,3 +15,14 @@ def test_get_paths_for_all_parents(vfp_dict, expected): result = get_paths_for_all_parents(vfp_dict) assert len(result) == len(expected) assert set(result) == expected + + +@pytest.mark.parametrize( + ('vfp_dict', 'allowed', 'expected'), + [ + ({}, set(), {}), + ({'parent_1': ['a', 'b'], 'parent_2': ['c', 'd']}, {'parent_1', 'parent_3'}, {'parent_1': ['a', 'b']}), + ], +) +def test_filter_vpf_dict(vfp_dict, allowed, expected): + assert filter_vpf_dict(vfp_dict, allowed) == expected