diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3a4b791..d15c003 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -30,10 +30,10 @@ jobs: run: | python -m pip install --upgrade pip pip install -r requirements.txt - pip install flake8 + pip install flake8 black - # - name: Run linting - # run: flake8 . + - name: Run linting + run: black --check . - name: Run tests with unittest id: run-tests diff --git a/setup.py b/setup.py index 8a82854..8ca2e41 100644 --- a/setup.py +++ b/setup.py @@ -3,6 +3,7 @@ This setup configuration includes the package metadata, such as the name, version, description, author, license, and URLs for the project. It also specifies the required Python version, the packages to be included, and the console script entry point. """ + from sierra_status.__version__ import __version__ from setuptools import setup, find_packages import os @@ -59,4 +60,3 @@ }, python_requires=">=3.8", # Requires Python 3.8 and above ) - diff --git a/sierra_status/__version__.py b/sierra_status/__version__.py index 51e0a06..1276d02 100644 --- a/sierra_status/__version__.py +++ b/sierra_status/__version__.py @@ -1 +1 @@ -__version__ = "0.1.4" \ No newline at end of file +__version__ = "0.1.5" diff --git a/sierra_status/src/cli.py b/sierra_status/src/cli.py index 60b59c2..4cb2aec 100644 --- a/sierra_status/src/cli.py +++ b/sierra_status/src/cli.py @@ -8,6 +8,7 @@ DEFAULT_BAUDRATE = 115200 + def setup_logging(verbose: bool) -> None: """ Sets up the logging configuration based on the provided verbosity level. @@ -21,6 +22,7 @@ def setup_logging(verbose: bool) -> None: log_level = logging.DEBUG if verbose else logging.INFO logging.basicConfig(level=log_level) + def validate_port(port: str) -> None: """ Validates that the specified USB port exists on the system. @@ -34,6 +36,7 @@ def validate_port(port: str) -> None: if not os.path.exists(port): raise ValueError(f"The specified port '{port}' does not exist.") + def main() -> None: """ The main entry point for the Sierra Wireless EM9xxx/EM7xxx CLI tool. @@ -53,18 +56,49 @@ def main() -> None: parser = argparse.ArgumentParser( description="CLI tool for Sierra Wireless EM9xxx/EM7xxx modules to query status", - formatter_class=argparse.RawTextHelpFormatter + formatter_class=argparse.RawTextHelpFormatter, ) - required = parser.add_argument_group('required arguments') - required.add_argument("-p", "--port", help="USB port to use (e.g., 'COM1' for Windows or '/dev/ttyUSB2' for Linux)", required=True) + required = parser.add_argument_group("required arguments") + required.add_argument( + "-p", + "--port", + help="USB port to use (e.g., 'COM1' for Windows or '/dev/ttyUSB2' for Linux)", + required=True, + ) - optional = parser.add_argument_group('optional arguments') - optional.add_argument("--version", help="Show version", action="version", version=f"%(prog)s {__version__}") - optional.add_argument("-m", "--model", help="Model of the device to add to filename (e.g., EM9191 or EM7455)", default="") - optional.add_argument("-v", "--verbose", help="Enable verbose output", action="store_true") - optional.add_argument("-s", "--search", help="Search for network using AT!COPS=?", action="store_true") - optional.add_argument("-b", "--baudrate", help=f"Baudrate to use for serial communication (default: {DEFAULT_BAUDRATE})", default=DEFAULT_BAUDRATE, type=int) + optional = parser.add_argument_group("optional arguments") + optional.add_argument( + "--version", + help="Show version", + action="version", + version=f"%(prog)s {__version__}", + ) + optional.add_argument( + "-m", + "--model", + help="Model of the device to add to filename (e.g., EM9191 or EM7455)", + default="", + ) + optional.add_argument( + "-v", "--verbose", help="Enable verbose output", action="store_true" + ) + optional.add_argument( + "-s", "--search", help="Search for network using AT!COPS=?", action="store_true" + ) + optional.add_argument( + "-b", + "--baudrate", + help=f"Baudrate to use for serial communication (default: {DEFAULT_BAUDRATE})", + default=DEFAULT_BAUDRATE, + type=int, + ) + optional.add_argument( + "-i", + "--interactive", + help="Enter interactive mode to send custom AT commands", + action="store_true", + ) args = parser.parse_args() @@ -72,10 +106,18 @@ def main() -> None: try: validate_port(args.port) - usb_handle.start_process(args.port, args.model.lower(), logging.getLogger().level, args.search, args.baudrate) + usb_handle.start_process( + args.port, + args.model.lower(), + logging.getLogger().level, + args.search, + args.baudrate, + args.interactive, + ) except Exception as e: logging.error(f"An error occurred: {str(e)}") sys.exit(1) + if __name__ == "__main__": main() diff --git a/sierra_status/src/conf.py b/sierra_status/src/conf.py new file mode 100644 index 0000000..8f174c1 --- /dev/null +++ b/sierra_status/src/conf.py @@ -0,0 +1,72 @@ +AT_COMMANDS = [ + "ATI", + "AT+CMEE=1", + "AT!PRIID?", + "AT!IMAGE?", + "ATI8", + "AT!GSTATUS?", + "AT+CPIN?", + "AT+CIMI", + "AT!PCINFO?", + "AT!CUSTOM?", + "AT+CREG?", + "AT+CGREG?", + "AT+CEREG?", + "AT+CGPADDR=1", + "AT!SELRAT?", + "AT+CGDCONT?", + "AT!UIMS?", + "AT!IMPREF?", + "AT!BAND?", + 'AT!ENTERCND="A710"', + "AT!BAND?", + "AT!HWID?", + "AT!USBCOMP?", + "AT!USBSPEED?", + "AT!USBPID?", + "AT!USBINFO?", + "AT!LTEINFO?", + "AT!NRINFO?", + "AT+COPS?", +] + +AT_COMMANDS_HL78 = [ + "ATI", + "AT+CMEE=1", + "AT+KSRAT?", + "AT+KBNDCFG?", + "AT+CIMI", + "AT+CPIN?", + "AT+CCID?", + "AT+CGSN", + "AT+HWREV", + "AT+CGDCONT?", + "AT+KCARRIERCFG?", + "AT+CEDRXS?", + "AT+CPSMS?", + "AT+KSIMDET?", + "AT+KSIMSEL?", + "AT+CREG?", + "AT+CEREG?", + "AT+KUSBCOMP?", + "AT&V", + "AT+IPR?", + "AT+CSQ", + "AT+KSLEEP?", + "AT+KNWSCANCFG?", + "AT+KTEMPMON?", + "AT+KCERTSTORE?", + "AT+KTCPCFG?", + "AT+KUDPCFG?", + "AT+KIPOPT?", + "AT+WDSC?", + "AT+WDSG", + "AT+NVBU=2", + "AT+COPS?", +] + +AT_COMMAND_COPS = "AT+COPS=?" + +DEFAULT_TIMEOUT = 60 +DEFAULT_BAUDRATE = 115200 +STATUS_FILE_PATTERN = "status_{model}_{timestamp}.txt" diff --git a/sierra_status/src/usb_handle.py b/sierra_status/src/usb_handle.py index b2e1243..5f8d953 100644 --- a/sierra_status/src/usb_handle.py +++ b/sierra_status/src/usb_handle.py @@ -1,90 +1,52 @@ - import sys import time import serial import logging +from sierra_status.src.conf import ( + AT_COMMANDS, + AT_COMMANDS_HL78, + AT_COMMAND_COPS, + DEFAULT_TIMEOUT, + DEFAULT_BAUDRATE, + STATUS_FILE_PATTERN, +) -AT_COMMANDS = [ - "ATI", - "AT+CMEE=1", - "AT!PRIID?", - "AT!IMAGE?", - "ATI8", - "AT!GSTATUS?", - "AT+CPIN?", - "AT+CIMI", - "AT!PCINFO?", - "AT!CUSTOM?", - "AT+CREG?", - "AT+CGREG?", - "AT+CEREG?", - "AT+CGPADDR=1", - "AT!SELRAT?", - "AT+CGDCONT?", - "AT!UIMS?", - "AT!IMPREF?", - "AT!BAND?", - 'AT!ENTERCND="A710"', - "AT!BAND?", - "AT!HWID?", - "AT!USBCOMP?", - "AT!USBSPEED?", - "AT!USBPID?", - "AT!USBINFO?", - "AT!LTEINFO?", - "AT!NRINFO?", - "AT+COPS?" -] - -AT_COMMANDS_HL78 =[ - "ATI", - "AT+CMEE=1", - "AT+KSRAT?", - "AT+KBNDCFG?", - "AT+CIMI", - "AT+CPIN?", - "AT+CCID?", - "AT+CGSN", - "AT+HWREV", - "AT+CGDCONT?", - "AT+KCARRIERCFG?", - "AT+CEDRXS?", - "AT+CPSMS?", - "AT+KSIMDET?", - "AT+KSIMSEL?", - "AT+CREG?", - "AT+CEREG?", - "AT+KUSBCOMP?", - "AT&V", - "AT+IPR?", - "AT+CSQ", - "AT+KSLEEP?", - "AT+KNWSCANCFG?", - "AT+KTEMPMON?", - "AT+KCERTSTORE?", - "AT+KTCPCFG?", - "AT+KUDPCFG?", - "AT+KIPOPT?", - "AT+WDSC?", - "AT+WDSG", - "AT+NVBU=2", - "AT+COPS?" -] - -AT_COMMAND_COPS = "AT+COPS=?" def animate_spinner() -> None: """ Animates a simple spinner character to indicate an ongoing operation. """ - chars = '|/-\\' + chars = "|/-\\" for char in chars: - sys.stdout.write(f'\rReading {char}') + sys.stdout.write(f"\rReading {char}") sys.stdout.flush() time.sleep(0.05) -def send_at_command(port: str, command: str, timeout: float = 60, baudrate: int = 115200) -> str: + +def send_at_command( + port: str, + command: str, + timeout: float = DEFAULT_TIMEOUT, + baudrate: int = DEFAULT_BAUDRATE, +) -> str: + """ + Sends an AT command to the specified serial port and returns the response. + + Args: + port (str): The serial port to use. + command (str): The AT command to send. + timeout (float, optional): The maximum time to wait for a response, in seconds. Defaults to 60. + baudrate (int, optional): The baud rate to use for the serial connection. Defaults to 115200. + + Returns: + str: The response from the AT command, with each line stripped of leading/trailing whitespace. + """ + if not port or not command: + raise ValueError("Port and command must be provided") + if baudrate <= 0: + raise ValueError("Baudrate must be a positive integer") + result = "" start_time = time.time() try: @@ -97,30 +59,57 @@ def send_at_command(port: str, command: str, timeout: float = 60, baudrate: int if "OK\r\n" in result or "ERROR\r\n" in result: break animate_spinner() + except serial.SerialException as e: + logging.error(f"Serial communication error: {e}") + except ValueError as e: + logging.error(f"Value error: {e}") except Exception as e: - logging.error(f"Error sending command: {e}") + logging.error(f"Unexpected error: {e}") finally: - sys.stdout.write('\r' + ' ' * 20 + '\r') # Clear the spinner line + sys.stdout.write("\r" + " " * 20 + "\r") # Clear the spinner line sys.stdout.flush() return "\n".join(line.strip() for line in result.splitlines() if line.strip()) -def get_module_status(port: str, search: int, model: str, baudrate: int = 115200) -> str: + +def get_module_status( + port: str, search: int, model: str, baudrate: int = 115200 +) -> str: """ Retrieves the status of an module using AT commands. + + Args: + port (str): The serial port to use. + search (int): A flag indicating whether to retrieve additional status information using the AT+COPS command. + model (str): The model of the module. + baudrate (int, optional): The baud rate to use for the serial connection. Defaults to 115200. + + Returns: + str: The status information retrieved from the module. """ result = "" try: commands = AT_COMMANDS_HL78 if model.lower() == "hl78xx" else AT_COMMANDS - result = "\n\n".join(send_at_command(port, command, baudrate=baudrate).strip() for command in commands) + result = "\n\n".join( + send_at_command(port, command, baudrate=baudrate).strip() + for command in commands + ) if search: - result += "\n\n" + get_em_cops(port) + result += f"\n\n{get_em_cops(port)}" except Exception as e: logging.error(f"Error getting module status: {e}") return result -def get_em_cops(port: str, baudrate: int = 115200) -> str: + +def get_em_cops(port: str, baudrate: int = DEFAULT_BAUDRATE) -> str: """ - Retrieves the status of an EM9xxx module using AT commands. + Retrieves the status of an EM9xxx module using the AT+COPS command. + + Args: + port (str): The serial port to use. + baudrate (int, optional): The baud rate to use for the serial connection. Defaults to the DEFAULT_BAUDRATE. + + Returns: + str: The status information retrieved from the module. """ result = "" try: @@ -131,26 +120,109 @@ def get_em_cops(port: str, baudrate: int = 115200) -> str: logging.error(f"Error getting EM9 status: {e}") return result + def creat_status_file(result: str, model: str) -> None: """ Creates a status file with the provided result. + + Args: + result (str): The status information to be written to the file. + model (str): The model of the module. + + Returns: + None """ try: time_stamp = time.strftime("%Y%m%d_%H%M%S", time.localtime()) - with open(f"status_{model}_{time_stamp}.txt", "w") as f: + file_name = STATUS_FILE_PATTERN.format(model=model, timestamp=time_stamp) + with open(file_name, "w") as f: f.write(result) - logging.info(f"Status file created: status_{model}_{time_stamp}.txt") + logging.info(f"Status file created: {file_name}") except Exception as e: logging.error(f"Error creating status file: {e}") -def start_process(port: str, model: str, log_level: int, search: int, baudrate: int = 115200) -> None: + +def get_interactive_command() -> str: """ - Main function to retrieve the status of an EM9xxx module using AT commands. + Gets and validates an AT command from user input. + + Returns: + str: Validated AT command or empty string to exit """ - logging.basicConfig(level=log_level) - logging.info(f"Starting process for port {port} with model {model} and baudrate {baudrate}") - result = get_module_status(port, search, model, baudrate) + command = input("Enter AT command: ").strip() + if command.lower() == "exit": + return "" + + if not command.upper().startswith("AT"): + logging.error("Command must start with 'AT'") + return get_interactive_command() + + return command + + +def handle_interactive_session(port: str, baudrate: int, model: str) -> None: + """ + Manages an interactive AT command session + """ + logging.info("Interactive AT Command Mode (type 'exit' to quit)") + result = "" + + while True: + command = get_interactive_command() + if not command: + logging.info("Exiting interactive mode") + break + + response = send_at_command(port, command, DEFAULT_TIMEOUT, baudrate) + result += f"\n=== Command: {command} ===\n{response}\n" + logging.info(response) + if result: - creat_status_file(result, model) + creat_status_file(result, f"{model}_interactive") + + +def start_process( + port: str, + model: str, + log_level: int, + search: int, + baudrate: int = DEFAULT_BAUDRATE, + interactive: bool = False, +) -> None: + """ + Main function to retrieve the status of an EM9xxx module using AT commands. + + Args: + port (str): The serial port to use. + model (str): The model of the module. + log_level (int): The logging level to use. + search (int): The search parameter to use. + baudrate (int, optional): The baud rate to use for the serial connection. + interactive (bool, optional): Run in interactive mode if True. + returns: + None + """ + start_time = time.time() + logging.basicConfig( + level=log_level, format="%(asctime)s - %(levelname)s - %(message)s" + ) + logging.info( + f"""Start time: {time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime())} + Starting process for port {port} + with model {model} and baudrate {baudrate}""" + ) + + if interactive: + handle_interactive_session(port, baudrate, model) else: - logging.error("No result received from the module.") + result = get_module_status(port, search, model, baudrate) + if result: + time_stamp = time.strftime("%Y%m%d_%H%M%S", time.localtime()) + result = f"Finished time: {time_stamp}\n" + result + creat_status_file(result, model) + else: + logging.error("No result received from the module.") + + logging.info( + f"Total time for running this script: {time.time() - start_time:.2f} seconds" + ) diff --git a/tests/test_usb_handle.py b/tests/test_usb_handle.py index 0dc3095..326ba18 100644 --- a/tests/test_usb_handle.py +++ b/tests/test_usb_handle.py @@ -1,188 +1,362 @@ import logging import unittest +import serial from unittest.mock import mock_open, patch, MagicMock -from sierra_status.src.usb_handle import AT_COMMAND_COPS, AT_COMMANDS, AT_COMMANDS_HL78, animate_spinner, creat_status_file, get_em_cops, get_module_status, send_at_command, start_process +from sierra_status.src.conf import ( + AT_COMMAND_COPS, + AT_COMMANDS, + AT_COMMANDS_HL78, + DEFAULT_BAUDRATE, +) +from sierra_status.src.usb_handle import ( + animate_spinner, + creat_status_file, + get_em_cops, + get_interactive_command, + get_module_status, + handle_interactive_session, + send_at_command, + start_process, +) + +EXPECTED_COMMANDS = ["ATI", "AT+CMEE=1", "AT!GSTATUS?", "AT+CPIN?"] +ENTER_CND_COMMAND = 'AT!ENTERCND="A710"' -class TestATCommands(unittest.TestCase): - - def test_at_commands_list_not_empty(self): - self.assertTrue(len(AT_COMMANDS) > 0) - - def test_at_commands_are_strings(self): - for command in AT_COMMANDS: - self.assertIsInstance(command, str) - def test_at_commands_start_with_at(self): - for command in AT_COMMANDS: - self.assertTrue(command.startswith("AT")) - - def test_specific_commands_present(self): - expected_commands = ["ATI", "AT+CMEE=1", "AT!GSTATUS?", "AT+CPIN?"] - for command in expected_commands: +class TestATCommands(unittest.TestCase): + def test_at_commands_properties(self) -> None: + for command_list in [AT_COMMANDS, AT_COMMANDS_HL78]: + with self.subTest(command_list=command_list): + self.assertTrue(len(command_list) > 0) + for command in command_list: + self.assertIsInstance(command, str) + self.assertTrue(command.startswith("AT")) + self.assertEqual(command, command.upper()) + + def test_specific_commands_present(self) -> None: + for command in EXPECTED_COMMANDS: self.assertIn(command, AT_COMMANDS) - def test_enter_cnd_command_format(self): - enter_cnd_command = 'AT!ENTERCND="A710"' - self.assertIn(enter_cnd_command, AT_COMMANDS) - - def test_at_commands_uppercase(self): - for command in AT_COMMANDS: - self.assertEqual(command, command.upper()) + def test_enter_cnd_command_format(self) -> None: + self.assertIn(ENTER_CND_COMMAND, AT_COMMANDS) - def test_at_commands_hl78_list_not_empty(self): - self.assertTrue(len(AT_COMMANDS_HL78) > 0) - - def test_at_commands_hl78_are_strings(self): - for command in AT_COMMANDS_HL78: - self.assertIsInstance(command, str) - - def test_at_commands_hl78_start_with_at(self): - for command in AT_COMMANDS_HL78: - self.assertTrue(command.startswith("AT")) class TestUSBHandle(unittest.TestCase): - - def setUp(self): + def setUp(self) -> None: self.mock_port = "COM1" self.mock_command = "AT+TEST" self.mock_result = "OK\r\n" - @patch('sierra_status.src.usb_handle.serial.Serial') - def test_send_at_command_success(self, mock_serial): - mock_serial.return_value.read.return_value = b'OK\r\n' - result = send_at_command(self.mock_port, self.mock_command) - self.assertEqual(result, "") - - @patch('sierra_status.src.usb_handle.serial.Serial') - def test_send_at_command_exception(self, mock_serial): - mock_serial.side_effect = Exception("Test exception") - result = send_at_command(self.mock_port, self.mock_command) - self.assertEqual(result, "") - - @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_module_status_without_search(self, mock_send_at_command): + def test_send_at_command_success(self) -> None: + with patch("sierra_status.src.usb_handle.serial.Serial") as mock_serial: + mock_instance: MagicMock = mock_serial.return_value + mock_instance.read.return_value = b"OK\r\n" + result: str = send_at_command(self.mock_port, self.mock_command) + self.assertEqual(result, "") + + def test_send_at_command_exception(self) -> None: + with patch( + "sierra_status.src.usb_handle.serial.Serial", + side_effect=Exception("Test exception"), + ): + result = send_at_command(self.mock_port, self.mock_command) + self.assertEqual(result, "") + + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_module_status_without_search( + self, mock_send_at_command: MagicMock + ) -> None: mock_send_at_command.return_value = "Test Result" result = get_module_status(self.mock_port, 0, "EM9xxx") self.assertIn("Test Result", result) - self.assertNotIn(AT_COMMAND_COPS, result) - @patch('sierra_status.src.usb_handle.send_at_command') - @patch('sierra_status.src.usb_handle.get_em_cops') - def test_get_module_status_with_search(self, mock_get_em_cops, mock_send_at_command): + @patch("sierra_status.src.usb_handle.send_at_command") + @patch("sierra_status.src.usb_handle.get_em_cops") + def test_get_module_status_with_search( + self, mock_get_em_cops: MagicMock, mock_send_at_command: MagicMock + ) -> None: mock_send_at_command.return_value = "Test Result" mock_get_em_cops.return_value = "COPS Result" result = get_module_status(self.mock_port, 1, "EM9xxx") self.assertIn("Test Result", result) self.assertIn("COPS Result", result) - @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_em_cops(self, mock_send_at_command): + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_em_cops(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "COPS Test Result" result = get_em_cops(self.mock_port) self.assertEqual(result, "COPS Test Result") - mock_send_at_command.assert_called_with(self.mock_port, AT_COMMAND_COPS, 120, 115200) - @patch('builtins.open', new_callable=mock_open) - @patch('sierra_status.src.usb_handle.time.strftime') - def test_creat_status_file(self, mock_strftime, mock_file): + @patch("builtins.open", new_callable=mock_open) + @patch("sierra_status.src.usb_handle.time.strftime") + def test_creat_status_file(self, mock_strftime, mock_file) -> None: mock_strftime.return_value = "20230101_120000" creat_status_file("Test Status", "TestModel") mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w") mock_file().write.assert_called_with("Test Status") - @patch('sierra_status.src.usb_handle.get_module_status') - @patch('sierra_status.src.usb_handle.creat_status_file') - def test_start_process_with_result(self, mock_creat_status_file, mock_get_module_status): + @patch("sierra_status.src.usb_handle.get_module_status") + @patch("sierra_status.src.usb_handle.creat_status_file") + @patch("sierra_status.src.usb_handle.time.strftime") + def test_start_process_with_result( + self, mock_strftime, mock_creat_status_file, mock_get_module_status + ) -> None: mock_get_module_status.return_value = "Test Status" + mock_strftime.return_value = "20230101_120000" start_process(self.mock_port, "TestModel", logging.INFO, 0) - mock_creat_status_file.assert_called_with("Test Status", "TestModel") - - @patch('sierra_status.src.usb_handle.get_module_status') - @patch('sierra_status.src.usb_handle.creat_status_file') - def test_start_process_without_result(self, mock_creat_status_file, mock_get_module_status): + expected_result = "Finished time: 20230101_120000\nTest Status" + mock_creat_status_file.assert_called_with(expected_result, "TestModel") + + @patch("sierra_status.src.usb_handle.get_module_status") + @patch("sierra_status.src.usb_handle.creat_status_file") + def test_start_process_without_result( + self, mock_creat_status_file, mock_get_module_status + ) -> None: mock_get_module_status.return_value = "" start_process(self.mock_port, "TestModel", logging.INFO, 0) mock_creat_status_file.assert_not_called() class TestAnimateSpinner(unittest.TestCase): + def test_animate_spinner(self) -> None: + with patch("sys.stdout") as mock_stdout, patch("time.sleep") as mock_sleep: + animate_spinner() + self.assertEqual(mock_stdout.write.call_count, 4) + self.assertEqual(mock_stdout.flush.call_count, 4) + self.assertEqual(mock_sleep.call_count, 4) + + def test_animate_spinner_interruption(self) -> None: + with patch("sys.stdout") as mock_stdout, patch( + "time.sleep", side_effect=KeyboardInterrupt() + ): + with self.assertRaises(KeyboardInterrupt): + animate_spinner() + self.assertEqual(mock_stdout.write.call_count, 1) + self.assertEqual(mock_stdout.flush.call_count, 1) - @patch('sys.stdout') - @patch('time.sleep') - def test_animate_spinner(self, mock_sleep, mock_stdout): - animate_spinner() - self.assertEqual(mock_stdout.write.call_count, 4) - self.assertEqual(mock_stdout.flush.call_count, 4) - self.assertEqual(mock_sleep.call_count, 4) class TestSendATCommand(unittest.TestCase): - - @patch('sierra_status.src.usb_handle.serial.Serial') - @patch('sierra_status.src.usb_handle.time.time') - def test_send_at_command_timeout(self, mock_time, mock_serial): + @patch("sierra_status.src.usb_handle.serial.Serial") + @patch("sierra_status.src.usb_handle.time.time") + def test_send_at_command_timeout(self, mock_time, mock_serial) -> None: mock_time.side_effect = [0, 61] # Simulate timeout - mock_serial.return_value.read.return_value = b'' + mock_serial.return_value.read.return_value = b"" result = send_at_command("COM1", "AT+TEST", timeout=60) self.assertEqual(result, "") - @patch('sierra_status.src.usb_handle.serial.Serial') - def test_send_at_command_error_response(self, mock_serial): - mock_serial.return_value.read_until.return_value = b'ERROR\r\n' + @patch("sierra_status.src.usb_handle.serial.Serial") + def test_send_at_command_error_response(self, mock_serial) -> None: + mock_serial.return_value.read_until.return_value = b"ERROR\r\n" result = send_at_command("COM1", "AT+TEST") self.assertEqual(result, "") -class TestGetModuleStatus(unittest.TestCase): - @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_module_status_exception(self, mock_send_at_command): +class TestGetModuleStatus(unittest.TestCase): + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_module_status_exception(self, mock_send_at_command) -> None: mock_send_at_command.side_effect = Exception("Test exception") result = get_module_status("COM1", 0, "EM9xxx") self.assertEqual(result, "") - @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_module_status_all_commands(self, mock_send_at_command): + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_module_status_all_commands(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "OK" result = get_module_status("COM1", 0, "EM9xxx") self.assertEqual(result.count("OK"), len(AT_COMMANDS)) - @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_module_status_hl78xx(self, mock_send_at_command): + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_module_status_hl78xx(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "OK" result = get_module_status("COM1", 0, "HL78xx") self.assertEqual(result.count("OK"), len(AT_COMMANDS_HL78)) class TestCreatStatusFile(unittest.TestCase): - - @patch('builtins.open', new_callable=mock_open) - @patch('sierra_status.src.usb_handle.time.strftime') - def test_creat_status_file_exception(self, mock_strftime, mock_file): + @patch("builtins.open", new_callable=mock_open) + @patch("sierra_status.src.usb_handle.time.strftime") + def test_creat_status_file_exception(self, mock_strftime, mock_file) -> None: mock_strftime.return_value = "20230101_120000" mock_file.side_effect = Exception("Test exception") - with self.assertLogs(level='ERROR') as log: + with self.assertLogs(level="ERROR") as log: creat_status_file("Test Status", "TestModel") self.assertIn("Error creating status file", log.output[0]) -class TestStartProcess(unittest.TestCase): - @patch('sierra_status.src.usb_handle.get_module_status') - @patch('sierra_status.src.usb_handle.creat_status_file') - @patch('sierra_status.src.usb_handle.logging.basicConfig') - def test_start_process_log_level(self, mock_basicConfig, mock_creat_status_file, mock_get_module_status): +class TestStartProcess(unittest.TestCase): + @patch("sierra_status.src.usb_handle.get_module_status") + @patch("sierra_status.src.usb_handle.creat_status_file") + @patch("sierra_status.src.usb_handle.logging.basicConfig") + def test_start_process_log_level( + self, mock_basicConfig, mock_creat_status_file, mock_get_module_status + ) -> None: mock_get_module_status.return_value = "Test Status" start_process("COM1", "TestModel", logging.DEBUG, 0) - mock_basicConfig.assert_called_with(level=logging.DEBUG) - - @patch('sierra_status.src.usb_handle.get_module_status') - @patch('sierra_status.src.usb_handle.creat_status_file') - @patch('sierra_status.src.usb_handle.logging.error') - def test_start_process_no_result(self, mock_logging_error, mock_creat_status_file, mock_get_module_status): + mock_basicConfig.assert_called_with( + level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" + ) + + @patch("sierra_status.src.usb_handle.get_module_status") + @patch("sierra_status.src.usb_handle.creat_status_file") + @patch("sierra_status.src.usb_handle.logging.error") + def test_start_process_no_result( + self, mock_logging_error, mock_creat_status_file, mock_get_module_status + ) -> None: mock_get_module_status.return_value = "" start_process("COM1", "TestModel", logging.INFO, 0) mock_logging_error.assert_called_with("No result received from the module.") mock_creat_status_file.assert_not_called() -if __name__ == '__main__': - unittest.main() +class TestSendATCommandAdvanced(unittest.TestCase): + @patch("sierra_status.src.usb_handle.serial.Serial") + def test_send_at_command_invalid_port(self, mock_serial) -> None: + mock_serial.side_effect = serial.SerialException("Invalid port") + result = send_at_command("INVALID_PORT", "AT+TEST") + self.assertEqual(result, "") + + def test_send_at_command_empty_port(self) -> None: + with self.assertRaises(ValueError): + send_at_command("", "AT+TEST") + + def test_send_at_command_empty_command(self) -> None: + with self.assertRaises(ValueError): + send_at_command("COM1", "") + + def test_send_at_command_invalid_baudrate(self) -> None: + with self.assertRaises(ValueError): + send_at_command("COM1", "AT+TEST", baudrate=0) + + +class TestGetModuleStatusAdvanced(unittest.TestCase): + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_module_status_hl78xx_model(self, mock_send_at_command) -> None: + mock_send_at_command.return_value = "HL78xx Response" + result = get_module_status("COM1", 0, "HL78xx") + self.assertIn("HL78xx Response", result) + self.assertEqual(mock_send_at_command.call_count, len(AT_COMMANDS_HL78)) + + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_module_status_unknown_model(self, mock_send_at_command) -> None: + mock_send_at_command.return_value = "Unknown Model Response" + result = get_module_status("COM1", 0, "UnknownModel") + self.assertIn("Unknown Model Response", result) + self.assertEqual(mock_send_at_command.call_count, len(AT_COMMANDS)) + + @patch("sierra_status.src.usb_handle.send_at_command") + @patch("sierra_status.src.usb_handle.get_em_cops") + def test_get_module_status_with_search_exception( + self, mock_get_em_cops, mock_send_at_command + ) -> None: + mock_send_at_command.return_value = "Test Result" + mock_get_em_cops.side_effect = Exception("COPS Error") + result = get_module_status("COM1", 1, "EM9xxx") + self.assertIn("Test Result", result) + self.assertNotIn("COPS Error", result) + + +class TestGetEmCopsAdvanced(unittest.TestCase): + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_em_cops_timeout(self, mock_send_at_command) -> None: + mock_send_at_command.side_effect = TimeoutError("Command timed out") + result = get_em_cops("COM1") + self.assertEqual(result, "") + + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_em_cops_custom_baudrate(self, mock_send_at_command) -> None: + mock_send_at_command.return_value = "Custom Baudrate Result" + result = get_em_cops("COM1", baudrate=9600) + mock_send_at_command.assert_called_with("COM1", AT_COMMAND_COPS, 120, 9600) + self.assertEqual(result, "Custom Baudrate Result") + + +class TestCreatStatusFileAdvanced(unittest.TestCase): + @patch("builtins.open", new_callable=mock_open) + @patch("sierra_status.src.usb_handle.time.strftime") + def test_creat_status_file_unicode_content(self, mock_strftime, mock_file) -> None: + mock_strftime.return_value = "20230101_120000" + unicode_content = "Test Status with Unicode: ñáéíóú" + creat_status_file(unicode_content, "TestModel") + mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w") + mock_file().write.assert_called_with(unicode_content) + + +class TestInteractiveMode(unittest.TestCase): + def setUp(self) -> None: + self.mock_port = "COM1" + self.mock_baudrate = 115200 + self.mock_model = "TestModel" + + @patch("builtins.input") + def test_get_interactive_command_valid(self, mock_input) -> None: + mock_input.return_value = "AT+TEST" + result = get_interactive_command() + self.assertEqual(result, "AT+TEST") + + @patch("builtins.input") + def test_get_interactive_command_exit(self, mock_input) -> None: + mock_input.return_value = "exit" + result = get_interactive_command() + self.assertEqual(result, "") + + @patch("builtins.input") + def test_get_interactive_command_invalid(self, mock_input) -> None: + mock_input.side_effect = ["INVALID", "AT+TEST"] + result = get_interactive_command() + self.assertEqual(result, "AT+TEST") + + @patch("sierra_status.src.usb_handle.get_interactive_command") + @patch("sierra_status.src.usb_handle.send_at_command") + @patch("sierra_status.src.usb_handle.creat_status_file") + def test_handle_interactive_session( + self, mock_create_file, mock_send, mock_get_command + ) -> None: + mock_get_command.side_effect = ["AT+TEST1", "AT+TEST2", ""] + mock_send.return_value = "OK" + + handle_interactive_session(self.mock_port, self.mock_baudrate, self.mock_model) + + self.assertEqual(mock_send.call_count, 2) + mock_create_file.assert_called_once() + self.assertIn("AT+TEST1", mock_create_file.call_args[0][0]) + self.assertIn("AT+TEST2", mock_create_file.call_args[0][0]) + + @patch("sierra_status.src.usb_handle.handle_interactive_session") + @patch("sierra_status.src.usb_handle.get_module_status") + def test_start_process_interactive_mode( + self, mock_get_status, mock_interactive + ) -> None: + start_process( + self.mock_port, + "TestModel", + logging.INFO, + 0, + self.mock_baudrate, + interactive=True, + ) + + mock_interactive.assert_called_once_with( + self.mock_port, self.mock_baudrate, "TestModel" + ) + mock_get_status.assert_not_called() + + @patch("sierra_status.src.usb_handle.handle_interactive_session") + @patch("sierra_status.src.usb_handle.get_module_status") + def test_start_process_standard_mode( + self, mock_get_status, mock_interactive + ) -> None: + start_process( + self.mock_port, + "TestModel", + logging.INFO, + 0, + self.mock_baudrate, + interactive=False, + ) + + mock_interactive.assert_not_called() + mock_get_status.assert_called_once() + + +if __name__ == "__main__": + unittest.main()