这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/plugins/analysis/cwe_checker/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
internal/
Empty file.
Empty file.
194 changes: 194 additions & 0 deletions src/plugins/analysis/cwe_checker/code/cwe_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
'''
This plugin implements a wrapper around the BAP plugin cwe_checker, which checks ELF executables for
several CWEs (Common Weakness Enumeration). Please refer to cwe_checkers implementation for further information.
Please note that these checks are heuristics and the checks are static.
This means that there are definitely false positives and false negatives. The objective of this
plugin is to find potentially interesting binaries that deserve a deep manual analysis or intensive fuzzing.

As the plugin depends on BAP, it depends on BAP's lifting capabilities. Currently, BAP
lifts to the following architectures:
- Intel x86 (32 and 64 bits)
- ARM
- PowerPC
- Mips
'''
from collections import defaultdict
import logging
import os

import sexpdata

from common_helper_process import execute_shell_command_get_return_code
from analysis.PluginBase import AnalysisBasePlugin

PATH_TO_BAP = '~/.opam/4.05.0/bin/bap'
BAP_TIMEOUT = 10


class CweWarning(object):

def __init__(self, name, plugin_version, warning):
self.name = name
self.plugin_version = plugin_version
self.warning = warning


class CweWarningParser(object):
'''
Parses a CWE warning emitted by the BAP plugin CweChecker
'''

@staticmethod
def _remove_color(s):
'''
Removes 'color' from string
See https://stackoverflow.com/questions/287871/print-in-terminal-with-colors/293633#293633
'''
return s.replace('\x1b[0m', '').strip()

def parse(self, warning):
try:
splitted_line = warning.split('WARN')
cwe_warning = splitted_line[1].replace(
'u32', '').replace(':', '')

cwe_name = self._remove_color(cwe_warning.split(')')[0]) + ')'
cwe_name = cwe_name.split('{')[0].strip() + ' ' + cwe_name.split('}')[1].strip()

plugin_version = cwe_warning.split('{')[1].split('}')[0]

cwe_message = ')'.join(cwe_warning.split(')')[1:])
cwe_message = cwe_message.replace('.', '').replace('32u', '')

return CweWarning(cwe_name, plugin_version, cwe_message)
except IndexError as e:
logging.error('IndexError while parsing CWE warning: {}.'.format(str(e)))
return None


class AnalysisPlugin(AnalysisBasePlugin):
'''
This class implements the FACT Python wrapper for the BAP plugin cwe_checker.
'''
NAME = 'cwe_checker'
DESCRIPTION = 'This plugin checks ELF binaries for several CWEs (Common Weakness Enumeration) like \
CWE-243 (Creation of chroot Jail Without Changing Working Directory) and \
CWE-676 (Use of Potentially Dangerous Function). Internally it uses BAP 1.5, which currently supports ARM, x86/x64, PPC and MIPS. \
Due to the nature of static analysis, this plugin may run for a long time.'
DEPENDENCIES = ['cpu_architecture', 'file_type']
VERSION = '0.3.3'
MIME_WHITELIST = ['application/x-executable', 'application/x-object', 'application/x-sharedlib']
SUPPORTED_ARCHS = ['arm', 'x86', 'x64', 'mips', 'ppc']

def __init__(self, plugin_adminstrator, config=None, recursive=True, docker=True):
self.config = config
self.docker = docker
if self.docker:
if not self._check_docker_installed():
raise Exception('Docker support is turned on but Docker is not installed.')
self._module_versions = self._get_module_versions()
logging.info('Module versions are {}'.format(str(self._module_versions)))
super().__init__(plugin_adminstrator, config=config,
plugin_path=__file__, recursive=recursive)

@staticmethod
def _check_docker_installed():
_, return_code = execute_shell_command_get_return_code('docker -v')
return return_code == 0

def _get_module_versions(self):
bap_command = self._build_bap_command_for_modules_versions()
output, return_code = execute_shell_command_get_return_code(bap_command)
if return_code != 0:
logging.error('Could not get module versions from Bap plugin: {} ({}). I tried the following command: {}'.format(
return_code, output, bap_command))
return {}
else:
return self._parse_module_versions(output)

@staticmethod
def _parse_module_versions(bap_output):
module_versions = {}
for line in bap_output.splitlines():
if 'module_versions:' in line:
version_sexp = line.split('module_versions:')[-1].strip()
module_versions = dict(sexpdata.loads(version_sexp))
return module_versions

def _build_bap_command_for_modules_versions(self):
# unfortunately, there must be a dummy file passed to BAP, I chose /bin/true because it is damn small
if self.docker:
bap_command = 'docker run cwe-checker:latest bap /bin/true --pass=cwe-checker --cwe-checker-module_versions=true'
else:
bap_command = '{} {} --pass=cwe-checker --cwe-checker-module_versions=true'.format(PATH_TO_BAP, '/bin/true')
return bap_command

def _build_bap_command(self, file_object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be easy to test?

if self.docker:
bap_command = 'timeout --signal=SIGKILL {}m docker run -v {}:/tmp/input cwe-checker:latest bap /tmp/input '\
'--pass=cwe-checker --cwe-checker-config=/home/bap/cwe_checker/src/config.json'.format(
BAP_TIMEOUT,
file_object.file_path)
else:
bap_command = 'timeout --signal=SIGKILL {}m {} {} --pass=cwe-checker --cwe-checker-config={}/../internal/src/config.json'.format(
BAP_TIMEOUT,
PATH_TO_BAP,
file_object.file_path,
os.path.join(os.path.dirname(os.path.abspath(__file__))))
return bap_command

@staticmethod
def _parse_bap_output(output):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a static method. It should be easy to test it?

tmp = defaultdict(list)
cwe_parser = CweWarningParser()

for line in output.splitlines():
if 'WARN' in line:
cwe_warning = cwe_parser.parse(line)
tmp[cwe_warning.name].append(cwe_warning)

res = {}
for key, values in tmp.items():
tmp_list = []
plugin_version = None
for cwe in values:
tmp_list.append(cwe.warning)
if not plugin_version:
plugin_version = cwe.plugin_version
res[key] = {'plugin_version': plugin_version,
'warnings': tmp_list}

return res

def _is_supported_arch(self, file_object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be easy to test as well.

arch_type = file_object.processed_analysis['file_type']['full'].lower()
return any(supported_arch in arch_type for supported_arch in self.SUPPORTED_ARCHS)

def _do_full_analysis(self, file_object):
bap_command = self._build_bap_command(file_object)
output, return_code = execute_shell_command_get_return_code(
bap_command)
if return_code != 0:
logging.error('Could not communicate with Bap plugin: {} ({}).'.format(
return_code, output))
file_object.processed_analysis[self.NAME] = {'summary': []}
else:
cwe_messages = self._parse_bap_output(output)
file_object.processed_analysis[self.NAME] = {'full': cwe_messages,
'summary': list(cwe_messages.keys())}
return file_object

def process_object(self, file_object):
'''
This function handles only ELF executable. Otherwise it returns an empty dictionary.
It calls the external BAP plugin cwe_checker.
'''
if not self._is_supported_arch(file_object):
logging.debug('{}\'s arch is not supported ({})'.format(
file_object.file_path,
file_object.processed_analysis['cpu_architecture']['summary']))
file_object.processed_analysis[self.NAME] = {'summary': []}
else:
file_object = self._do_full_analysis(file_object)

return file_object
26 changes: 26 additions & 0 deletions src/plugins/analysis/cwe_checker/install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env bash

# change cwd to current file's directory
cd "$( dirname "${BASH_SOURCE[0]}" )"

echo "------------------------------------"
echo " Installing cwe_checker Plugin "
echo "------------------------------------"

echo "Checking out cwe_checker"
git clone https://github.com/fkie-cad/cwe_checker.git internal

echo "Cleaning up"
rm -rf internal/src/_build
rm -f internal/src/cwe_checker.plugin

echo "Building docker container"
cd internal && docker build --build-arg http_proxy=$http_proxy --build-arg https_proxy=$https_proxy --build-arg HTTP_PROXY=$http_proxy --build-arg HTTPS_PROXY=$https_proxy -t cwe-checker .

# change cwd to current file's directory
cd "$( dirname "${BASH_SOURCE[0]}" )"

echo "Installing Python dependencies."
sudo -EH pip3 install sexpdata

exit 0
Empty file.
72 changes: 72 additions & 0 deletions src/plugins/analysis/cwe_checker/test/test_cwe_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os

from objects.file import FileObject
from test.unit.analysis.analysis_plugin_test_class import AnalysisPluginTest
from ..code.cwe_checker import AnalysisPlugin, CweWarningParser, BAP_TIMEOUT, PATH_TO_BAP


class TestCweCheckerFunctions(AnalysisPluginTest):

PLUGIN_NAME = 'cwe_checker'

def setUp(self):
super().setUp()
config = self.init_basic_config()
# TODO: Mock calls to BAP
AnalysisPlugin._get_module_versions = lambda self: {}
self.analysis_plugin = AnalysisPlugin(self, config=config, docker=False)

def test_cwe_warning_parser_can_parse_warning(self):
data = '2018-02-16 13:27:35.552 WARN : [CWE476] {0.1} (NULL Pointer Dereference) There is no check if the return value is NULL at 0x104A0:32u/00000108 (malloc).'
p = CweWarningParser()
res = p.parse(data)
self.assertEqual(res.name, '[CWE476] (NULL Pointer Dereference)')
self.assertEqual(res.plugin_version, '0.1')
self.assertEqual(res.warning.strip(), 'There is no check if the return value is NULL at 0x104A0/00000108 (malloc)')

def test_cwe_warning_parser_does_not_parse_empty_warning(self):
p = CweWarningParser()
res = p.parse("")
self.assertEqual(res, None)

def test_parse_module_version(self):
data = '018-02-16 13:33:37.571 INFO : [cwe_checker] module_versions: (("CWE215" "0.1") ("CWE243" "0.1") ("CWE332" "0.1") ("CWE367" "0.1") ("CWE415" "0.1") ("CWE426" "0.1") ("CWE467" "0.1") ("CWE476" "0.1") ("CWE676" "0.1"))'
expected_result = {'CWE215': '0.1',
'CWE243': '0.1',
'CWE332': '0.1',
'CWE367': '0.1',
'CWE415': '0.1',
'CWE426': '0.1',
'CWE467': '0.1',
'CWE476': '0.1',
'CWE676': '0.1'}
res = self.analysis_plugin._parse_module_versions(data)
self.assertEqual(res, expected_result)

def test_build_bap_command(self):
self.analysis_plugin.docker = True
fo = FileObject(file_path='/foo')
assert self.analysis_plugin._build_bap_command(fo) == 'timeout --signal=SIGKILL {}m docker run -v {}:/tmp/input cwe-checker:latest bap /tmp/input --pass=cwe-checker --cwe-checker-config=/home/bap/cwe_checker/src/config.json'.format(BAP_TIMEOUT, fo.file_path)

def test_build_bap_command_no_docker(self):
self.analysis_plugin.docker = False
fo = FileObject(file_path='/foo')
assert self.analysis_plugin._build_bap_command(fo) == 'timeout --signal=SIGKILL {}m {} {} --pass=cwe-checker --cwe-checker-config={}/code/../internal/src/config.json'.format(
BAP_TIMEOUT,
PATH_TO_BAP,
fo.file_path,
os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

def test_parse_bap_output(self):
test_data = '2018-10-19 11:41:20.030 WARN : [CWE215] {0.1} (Information Exposure Through Debug Information) CU: cwe_332.c:\n2018-10-19 11:41:20.030 WARN : [CWE332] {0.1} (Insufficient Entropy in PRNG) program uses rand without calling srand before'
result = self.analysis_plugin._parse_bap_output(test_data)
print(result)
assert isinstance(result, dict)
assert len(result.keys()) == 2
assert isinstance(result['[CWE215] (Information Exposure Through Debug Information)'], dict)

def test_is_supported_arch(self):
fo = FileObject()
test_data = 'ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 2.6.32, BuildID[sha1]=8e756708f62592be105b5e8b423080d38ddc8391, stripped'
fo.processed_analysis = {'file_type': {'full': test_data}}
assert self.analysis_plugin._is_supported_arch(fo)
28 changes: 28 additions & 0 deletions src/plugins/analysis/cwe_checker/view/cwe_checker.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{% extends "analysis_plugins/general_information.html" %}

{% block analysis_result_details %}

<tr>
<td class="result"> Overview of CWE warnings </td>
<td class="result">
<ul>
{% for cwe in firmware.processed_analysis[selected_analysis]['summary']|sort %}
<li><a href="https://cwe.mitre.org/data/definitions/{{ cwe | fix_cwe }}.html">{{ cwe }}</a></li>
{% endfor %}
</ul>
</td>
</tr>

{% for cwe in firmware.processed_analysis[selected_analysis]['full']|sort %}
<tr>
<td class="result"> {{cwe}} ({{firmware.processed_analysis[selected_analysis]['full'][cwe]['plugin_version']}}) </td>
<td class="result">
<ul>
{% for msg in firmware.processed_analysis[selected_analysis]['full'][cwe]['warnings'] %}
<li> {{msg}} </li>
{% endfor %}
</ul>
</td>
</tr>
{% endfor %}
{% endblock %}