Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / shodan / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

"""
Faraday Penetration Test IDE
Copyright (C) 2013  Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information

"""
import re
import json
import argparse
import shlex
import gzip
import os
import shutil

from faraday_plugins.plugins.plugin import PluginMultiLineJsonFormat
from faraday_plugins.plugins.plugins_utils import get_severity_from_cvss

__author__ = "Valentin Vila"
__copyright__ = "Copyright (c) 2021, Faraday"
__credits__ = ["Valentin Vila"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Valentin Vila"
__email__ = "[email protected]"
__status__ = "Development"


class ShodanPlugin(PluginMultiLineJsonFormat):
    """
    This plugin handles the Shodan tool.
    Detects the output of the tool
    and adds the information to Faraday.
    """

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.id = "shodan"
        self.name = "Shodan"
        self.plugin_version = "0.0.1"
        self.version = "1.0.0"
        self._command_regex = re.compile(r'^shodan\s+(?P<option>search|download)\s+.*\w+')
        self._use_temp_file = True
        self._temp_file_extension = "json.gz"
        self.json_keys = {'_shodan'}

    def _parse_filename(self, filename):
        if self.has_custom_output():
            with gzip.open(filename, 'rb') as output:
                self.parseOutputString(output.read().decode('utf-8'))
        else:
            with open(filename) as output:
                self.parseOutputString(output.read())
        if self._delete_temp_file:
            try:
                if os.path.isfile(filename):
                    os.remove(filename)
                elif os.path.isdir(filename):
                    shutil.rmtree(filename)
            except Exception as e:
                self.logger.error(f"Error on delete file: ({filename}) [{e}]")

    def parseOutputString(self, output):
        for vuln_json in filter(lambda x: x != '', output.split("\n")):
            vuln_dict = json.loads(vuln_json)
            ip = vuln_dict.get('ip_str')
            port = vuln_dict.get('port')
            vulns = vuln_dict.get('vulns')
            transport = vuln_dict.get('transport')
            hostnames = vuln_dict.get('hostnames')
            h_id = self.createAndAddHost(ip, hostnames=hostnames)
            s_id = self.createAndAddServiceToHost(h_id, "http", protocol=transport, ports=port)
            if vulns is not None:
                for name, vuln_info in vulns.items():
                    description = vuln_info.get('summary')
                    references = vuln_info.get('references')
                    self.createAndAddVulnToService(h_id, s_id, name, desc=description, ref=references
                                                   , cve=name)

    def processCommandString(self, username, current_path, command_string):
        """
        Adds the path to a temporary file parameter to get .json.gz output to the command string that the
        user has set.
        """
        super().processCommandString(username, current_path, command_string)
        parser = argparse.ArgumentParser(conflict_handler='resolve')
        match = self._command_regex.match(command_string)
        if match.groupdict()['option'] == 'search':
            parser.add_argument("--color", action="store_true")
            parser.add_argument("--no-color", action="store_true")
            parser.add_argument("--fields", type=str)
            parser.add_argument("--limit", type=int)
            parser.add_argument("--separator", type=str, nargs=-1)
            parser.add_argument("query")
        else:
            parser.add_argument("--limit", type=int)
            parser.add_argument("query", type=str, nargs=-1)
        args = parser.parse_args(shlex.split(command_string.split("search ")[-1]))
        limit = args.limit
        query = args.query
        cmd = "shodan download"
        if limit:
            cmd += f" --limit {limit}"
        cmd += f" {self._output_file_path} {query}"
        return cmd


def createPlugin(*args, **kwargs):
    return ShodanPlugin(*args, **kwargs)