'''
Faraday Penetration Test IDE
Copyright (C) 2013 Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information
'''
from datetime import datetime
from io import BytesIO
import pytest
from faraday.server.api.modules.vulnerability_template import VulnerabilityTemplateView
from tests import factories
from tests.test_api_non_workspaced_base import (
ReadWriteAPITests, BulkUpdateTestsMixin, BulkDeleteTestsMixin
)
from faraday.server.models import (
VulnerabilityTemplate,
ReferenceTemplate)
from tests.factories import (
VulnerabilityTemplateFactory,
ReferenceTemplateFactory,
CustomFieldsSchemaFactory,
UserFactory,
VulnerabilityFactory
)
TEMPLATES_DATA = [
{'name': 'XML Injection (aka Blind XPath Injection) (Type: Base)',
'description': 'The software does not properly neutralize special elements that are u',
'resolution': 'resolved',
'severity': 'medium',
'create_date': datetime(2020, 5, 1, 11, 00),
'creator': 'testuser'
},
{'name': 'xml InjectioN (aka Blind XPath Injection) (Type: Base)',
'description': 'THE SOFtware does not properly neutralize special elements that are',
'resolution': 'not resolved',
'severity': 'high',
'create_date': datetime(2020, 6, 1),
'creator': 'testuser2'
}
]
@pytest.mark.usefixtures('logged_user')
class TestListVulnerabilityTemplateView(ReadWriteAPITests, BulkUpdateTestsMixin, BulkDeleteTestsMixin):
model = VulnerabilityTemplate
factory = factories.VulnerabilityTemplateFactory
api_endpoint = 'vulnerability_template'
view_class = VulnerabilityTemplateView
patchable_fields = ['description']
def test_backwards_json_compatibility(self, test_client, session):
self.factory.create()
session.commit()
res = test_client.get(self.url())
assert res.status_code == 200
assert 'rows' in res.json
for vuln in res.json['rows']:
assert {'id', 'key', 'value', 'doc'} == set(vuln.keys())
object_properties = [
'exploitation',
'references',
'refs',
'name',
'cwe',
'_rev',
'_id',
'resolution',
'description',
'desc'
]
expected = set(object_properties)
result = set(vuln['doc'].keys())
assert expected - result == set()
def _create_post_data_vulnerability_template(self, references):
data = {
"exploitation": "high",
"references": references,
"name": "name",
"resolution": "resolution",
"cwe": "swe",
"description": "desc"}
return data
def test_create_new_vulnerability_template(self, session, test_client):
vuln_count_previous = session.query(VulnerabilityTemplate).count()
raw_data = self._create_post_data_vulnerability_template(references='')
res = test_client.post('/v3/vulnerability_template', data=raw_data)
assert res.status_code == 201
assert isinstance(res.json['_id'], int)
assert vuln_count_previous + 1 == session.query(VulnerabilityTemplate).count()
vuln_template = VulnerabilityTemplate.query.get(res.json['_id'])
assert vuln_template.references == set()
@pytest.mark.usefixtures('ignore_nplusone')
@pytest.mark.parametrize('filters', [
{'field': 'name', 'op': 'eq', 'count': 0,
'filtered_value': 'xml Injection (aka Blind XPath Injection) (Type: Base)',
'expected_template_name': None,
'templates': TEMPLATES_DATA},
{'field': 'name', 'op': 'eq', 'count': 1,
'filtered_value': 'XML Injection (aka Blind XPath Injection) (Type: Base)',
'expected_template_name': TEMPLATES_DATA[0]['name'],
'templates': TEMPLATES_DATA},
{'field': 'name', 'op': 'like', 'count': 1,
'filtered_value': '% Injection (aka Blind XPath Injection)%',
'expected_template_name': TEMPLATES_DATA[0]['name'],
'templates': TEMPLATES_DATA},
{'field': 'name', 'op': 'ilike', 'count': 2,
'filtered_value': '% Injection (aka Blind XPath Injection)%',
'expected_template_name': None,
'templates': TEMPLATES_DATA},
{'field': 'description', 'op': 'eq', 'count': 1,
'filtered_value': 'The software does not properly neutralize special elements that are u',
'expected_template_name': TEMPLATES_DATA[0]['name'],
'templates': TEMPLATES_DATA},
{'field': 'description', 'op': 'like', 'count': 1,
'filtered_value': '% software does not properly neutralize special elements that are u',
'expected_template_name': TEMPLATES_DATA[0]['name'],
'templates': TEMPLATES_DATA},
{'field': 'description', 'op': 'ilike', 'count': 2,
'filtered_value': '% software does not properly neutralize special elements that are%',
'expected_template_name': None,
'templates': TEMPLATES_DATA},
{'field': 'resolution', 'op': 'eq', 'count': 1,
'filtered_value': 'resolved',
'expected_template_name': TEMPLATES_DATA[0]['name'],
'templates': TEMPLATES_DATA},
{'field': 'resolution', 'op': 'like', 'count': 1,
'filtered_value': 'resolv%',
'expected_template_name': TEMPLATES_DATA[0]['name'],
'templates': TEMPLATES_DATA},
{'field': 'resolution', 'op': 'ilike', 'count': 2,
'filtered_value': '%REsolve%',
'expected_template_name': None,
'templates': TEMPLATES_DATA},
{'field': 'severity', 'op': 'eq', 'count': 1,
'filtered_value': 'medium',
'expected_template_name': TEMPLATES_DATA[0]['name'],
'templates': TEMPLATES_DATA},
])
def test_filter_vuln_template(self, session, test_client, filters):
templates = []
VulnerabilityTemplate.query.delete()
session.commit()
for template in filters['templates']:
user = UserFactory.create(username=template['creator'])
session.commit()
templates.append(self.factory.create(
name=template['name'],
description=template['description'],
resolution=template['resolution'],
severity=template['severity'],
create_date=template['create_date'],
creator=user
))
session.commit()
query = f'/v3/vulnerability_template/filter?q={{"filters": [{{ "name": "{filters["field"]}",' \
f' "op": "{filters["op"]}", "val": "{filters["filtered_value"]}" }}]}}'
res = test_client.get(query)
assert res.status_code == 200
assert len(res.json['rows']) == filters['count']
if filters['count'] == 1:
assert res.json['rows'][0]['doc']['name'] == templates[0].name
@pytest.mark.usefixtures('ignore_nplusone')
@pytest.mark.parametrize('filters', [
{'field': 'creator_id', 'op': 'eq', 'count': 1,
'filtered_value': TEMPLATES_DATA[0]['creator'],
'expected_template_name': TEMPLATES_DATA[0]['name'],
'templates': TEMPLATES_DATA}
])
# TODO: fix filter restless to filter by username
def test_filter_vuln_template_by_creator(self, session, test_client, filters):
templates = []
VulnerabilityTemplate.query.delete()
session.commit()
for template in filters['templates']:
user = UserFactory.create(username=template['creator'])
session.commit()
templates.append(self.factory.create(
name=template['name'],
description=template['description'],
resolution=template['resolution'],
severity=template['severity'],
create_date=template['create_date'],
creator=user
))
session.commit()
query = f'/v3/vulnerability_template/filter?q={{"filters": [{{ "name": "{filters["field"]}",' \
f' "op": "{filters["op"]}", "val": "{templates[0].creator.id}" }}]}}'
res = test_client.get(query)
assert res.status_code == 200
assert len(res.json['rows']) == filters['count']
if filters['count'] == 1:
assert res.json['rows'][0]['doc']['name'] == templates[0].name
@pytest.mark.skip_sql_dialect('sqlite')
@pytest.mark.usefixtures('ignore_nplusone')
@pytest.mark.parametrize('filters', [
{'field': 'create_date', 'op': 'eq', 'count': 1,
'filtered_value': "2020-05-01",
'expected_template_name': TEMPLATES_DATA[0]['name'],
'templates': TEMPLATES_DATA}
])
def test_filter_vuln_template_by_create_date(self, session, test_client, filters):
templates = []
VulnerabilityTemplate.query.delete()
session.commit()
for template in filters['templates']:
user = UserFactory.create(username=template['creator'])
session.commit()
templates.append(self.factory.create(
name=template['name'],
description=template['description'],
resolution=template['resolution'],
severity=template['severity'],
create_date=template['create_date'],
creator=user
))
session.commit()
query = f'/v3/vulnerability_template/filter?q={{"filters": [{{ "name": "{filters["field"]}",' \
f' "op": "{filters["op"]}", "val": "{filters["filtered_value"]}" }}]}}'
res = test_client.get(query)
assert res.status_code == 200
assert len(res.json['rows']) == filters['count']
if filters['count'] == 1:
assert res.json['rows'][0]['doc']['name'] == templates[0].name
def test_update_vulnerability_template(self, session, test_client):
template = self.factory.create()
session.commit()
raw_data = self._create_post_data_vulnerability_template(references='')
res = test_client.put(f'/v3/vulnerability_template/{template.id}', data=raw_data)
assert res.status_code == 200
updated_template = session.query(VulnerabilityTemplate).filter_by(id=template.id).first()
assert updated_template.name == raw_data['name']
assert updated_template.severity == raw_data['exploitation']
assert updated_template.resolution == raw_data['resolution']
assert updated_template.description == raw_data['description']
assert updated_template.references == set()
@pytest.mark.parametrize('references', [
',',
',,',
'a,',
['a', 'b', ''],
{"a": 1},
{}
])
def test_400_on_invalid_reference(self, session, test_client, references):
template = self.factory.create()
session.commit()
raw_data = self._create_post_data_vulnerability_template(
references=references)
res = test_client.put(f'/v3/vulnerability_template/{template.id}', data=raw_data)
assert res.status_code == 400
def test_update_vulnerabiliy_template_change_refs(self, session, test_client):
template = self.factory.create()
for ref_name in {'old1', 'old2'}:
ref = ReferenceTemplateFactory.create(name=ref_name)
self.first_object.reference_template_instances.add(ref)
session.commit()
raw_data = self._create_post_data_vulnerability_template(references='new_ref,another_ref')
res = test_client.put(f'/v3/vulnerability_template/{template.id}', data=raw_data)
assert res.status_code == 200
updated_template = session.query(VulnerabilityTemplate).filter_by(id=template.id).first()
assert updated_template.name == raw_data['name']
assert updated_template.severity == raw_data['exploitation']
assert updated_template.resolution == raw_data['resolution']
assert updated_template.description == raw_data['description']
assert updated_template.references == {'another_ref', 'new_ref'}
def test_create_new_vulnerability_template_with_references(self, session, test_client):
vuln_count_previous = session.query(VulnerabilityTemplate).count()
raw_data = self._create_post_data_vulnerability_template(references='ref1,ref2')
res = test_client.post('/v3/vulnerability_template', data=raw_data)
assert res.status_code == 201
assert isinstance(res.json['_id'], int)
assert set(res.json['refs']) == {'ref1', 'ref2'}
assert vuln_count_previous + 1 == session.query(VulnerabilityTemplate).count()
new_template = session.query(VulnerabilityTemplate).filter_by(id=res.json['_id']).first()
assert new_template.references == {'ref1', 'ref2'}
def test_delete_vuln_template(self, session, test_client):
template = self.factory.create()
vuln_count_previous = session.query(VulnerabilityTemplate).count()
res = test_client.delete(f'/v3/vulnerability_template/{template.id}')
assert res.status_code == 204
assert vuln_count_previous - 1 == session.query(VulnerabilityTemplate).count()
def test_create_same_vuln_template_multipe_times_sohuld_raise_409(self, test_client):
"""
Current test case was found un hackaton. When creating vuln
template it returned error 500 instead of 409.
"""
raw_data = {
"id": 123010,
"cwe": "",
"description": "test2",
"desc": "test2",
"exploitation": "critical",
"name": "test2",
"references": [],
"refs": [],
"resolution": "",
"type": "vulnerability_template"
}
res = test_client.post(self.url(), data=raw_data)
assert res.status_code == 201
res = test_client.post(self.url(), data=raw_data)
assert res.status_code == 409
def test_when_a_template_with_ref_is_deleted_ref_remains_at_database(
self, session, test_client):
session.query(ReferenceTemplate).count() == 0
template = VulnerabilityTemplateFactory.create()
ref1 = ReferenceTemplateFactory.create()
template.reference_template_instances.add(ref1)
session.commit()
res = test_client.delete(self.url(template))
assert res.status_code == 204
assert session.query(ReferenceTemplate).count() == 1
def test_create_same_vuln_template_with_custom_fields(self, session, test_client):
custom_field_schema = CustomFieldsSchemaFactory(
field_name='cvss',
field_type='str',
field_display_name='CVSS',
table_name='vulnerability'
)
session.add(custom_field_schema)
session.commit()
raw_data = {
"id": 123010,
"cwe": "",
"description": "test2",
"desc": "test2",
"exploitation": "critical",
"name": "test2",
"references": [],
"refs": [],
"resolution": "",
"type": "vulnerability_template",
"customfields": {
"cvss": "value",
}
}
res = test_client.post(self.url(), data=raw_data)
assert res.status_code == 201
assert res.json['customfields'] == {'cvss': 'value'}
def test_update_vuln_template_with_custom_fields(self, session, test_client):
custom_field_schema = CustomFieldsSchemaFactory(
field_name='cvss',
field_type='str',
field_display_name='CVSS',
table_name='vulnerability'
)
template = VulnerabilityTemplateFactory.create()
session.add(custom_field_schema)
session.add(template)
session.commit()
raw_data = {
"cwe": "",
"description": "test2",
"desc": "test2",
"exploitation": "critical",
"name": "test2",
"references": [],
"refs": [],
"resolution": "",
"type": "vulnerability_template",
"customfields": {
"cvss": "updated value",
}
}
res = test_client.put(self.url(template.id), data=raw_data)
assert res.status_code == 200
assert res.json['customfields'] == {'cvss': 'updated value'}
vuln_template = session.query(VulnerabilityTemplate).filter_by(id=template.id).first()
assert vuln_template.custom_fields == {'cvss': 'updated value'}
def test_add_vuln_template_from_csv(self, session, test_client, csrf_token):
expected_created_vuln_template = 1
vuln_template_name = "EN-Improper Restriction of Operations within the Bounds of a Memory Buffer (Type: Class)"
file_contents = b"""cwe,name,description,resolution,exploitation,references\n
CWE-119,EN-Improper Restriction of Operations within the Bounds of a Memory Buffer (Type: Class),"The software performs operations on a memory buffer, but it can read from or write to a memory location that is outside of the intended boundary of the buffer.\n
Certain languages allow direct addressing of memory locations and do not automatically ensure that these locations are valid for the memory buffer that is being referenced. This can cause read or write operations to be performed on memory locations that may be associated with other variables, data structures, or internal program data.\n
As a result, an attacker may be able to execute arbitrary code, alter the intended control flow, read sensitive information, or cause the system to crash.",,high,"Writing Secure Code: Chapter 5, ""Public Enemy #1: The Buffer Overrun"" Page 127; Chapter 14, ""Prevent I18N Buffer Overruns"" Page 441\n
Using the Strsafe.h Functions: http://msdn.microsoft.com/en-us/library/ms647466.aspx\n
Safe C String Library v1.0.3: http://www.zork.org/safestr/\n
Address Space Layout Randomization in Windows Vista: http://blogs.msdn.com/michael_howard/archive/2006/05/26/address-space-layout-randomization-in-windows-vista.aspx\n
Limiting buffer overflows with ExecShield: http://www.redhat.com/magazine/009jul05/features/execshield/\n
PaX: http://en.wikipedia.org/wiki/PaX\n
Understanding DEP as a mitigation technology part 1: http://blogs.technet.com/b/srd/archive/2009/06/12/understanding-dep-as-a-mitigation-technology-part-1.aspx\n
The Art of Software Security Assessment: Chapter 5, ""Memory Corruption"", Page 167.\n
The Art of Software Security Assessment: Chapter 5, ""Protection Mechanisms"", Page 189."
"""
data = {
'file': (BytesIO(file_contents), 'vulns.csv'),
'csrf_token': csrf_token
}
headers = {'Content-type': 'multipart/form-data'}
res = test_client.post('/v3/vulnerability_template/bulk_create',
data=data, headers=headers, use_json_data=False)
assert res.status_code == 200
assert len(res.json['vulns_created']) == expected_created_vuln_template
assert res.json['vulns_created'][0][1] == vuln_template_name
def test_add_unicode_vuln_template_from_csv(self, session, test_client, csrf_token):
expected_created_vuln_template = 1
vuln_template_name = "ES-Exposición de información a través del listado de directorios"
file_contents = """cwe,name,description,resolution,exploitation,references
,ES-Exposición de información a través del listado de directorios,"Estos directorios no deberian estar publicos, pues exponen información sensible del tipo de tecnología utilizada, código de programación, información sobre rutas de acceso a distintos lugares, particularmente en este caso podemos listar toda la información del servidor sin ningun tipo de restricción
",Siempre evitar que se puedan listar directorios de manera externa y sin permisos,high,
"""
data = {
'file': (BytesIO(file_contents.encode()), 'vulns.csv'),
'csrf_token': csrf_token
}
headers = {'Content-type': 'multipart/form-data'}
res = test_client.post('/v3/vulnerability_template/bulk_create',
data=data, headers=headers, use_json_data=False)
assert res.status_code == 200
assert len(res.json['vulns_created']) == expected_created_vuln_template
assert res.json['vulns_created'][0][1] == vuln_template_name
def test_add_vuln_template_only_required_fields(self, session, test_client, csrf_token):
expected_created_vuln_template = 1
vuln_template_name = "test"
file_contents = b"""name,exploitation\n
"test",high
"""
data = {
'file': (BytesIO(file_contents), 'vulns.csv'),
'csrf_token': csrf_token
}
headers = {'Content-type': 'multipart/form-data'}
res = test_client.post('/v3/vulnerability_template/bulk_create',
data=data, headers=headers, use_json_data=False)
assert res.status_code == 200
assert len(res.json['vulns_created']) == expected_created_vuln_template
assert res.json['vulns_created'][0][1] == vuln_template_name
def test_add_vuln_template_missing_required_fields(self, session, test_client, csrf_token):
expected_created_vuln_template = 1
file_contents = b"""name,description\n
"test","description"
"""
data = {
'file': (BytesIO(file_contents), 'vulns.csv'),
'csrf_token': csrf_token
}
headers = {'Content-type': 'multipart/form-data'}
res = test_client.post('/v3/vulnerability_template/bulk_create',
data=data, headers=headers, use_json_data=False)
assert res.status_code == 400
assert 'name' not in res.data.decode('utf8')
assert 'exploitation' in res.data.decode('utf8')
def test_add_vuln_template_custom_fields_and_some_more_fields(self, session, test_client, csrf_token):
custom_field_schema = CustomFieldsSchemaFactory(
field_name='cvss',
field_type='str',
field_display_name='CVSS',
table_name='vulnerability'
)
session.add(custom_field_schema)
session.commit()
file_contents = b"""name,exploitation,resolution,data,cvss\n
"test",high,"resolution","technical details","5"
"""
data = {
'file': (BytesIO(file_contents), 'vulns.csv'),
'csrf_token': csrf_token
}
headers = {'Content-type': 'multipart/form-data'}
res = test_client.post('/v3/vulnerability_template/bulk_create',
data=data, headers=headers, use_json_data=False)
assert res.status_code == 200
assert len(res.json['vulns_created']) == 1
inserted_template = session.query(VulnerabilityTemplate).filter_by(name='test').first()
assert inserted_template.resolution == 'resolution'
assert inserted_template.severity == 'high'
assert inserted_template.data == 'technical details'
assert 'cvss' in inserted_template.custom_fields
assert inserted_template.custom_fields['cvss'] == '5'
def test_vuln_template_bulk_create(self, test_client, csrf_token):
vuln_1 = VulnerabilityFactory.build_dict()
vuln_1['exploitation'] = vuln_1['severity']
vuln_2 = VulnerabilityFactory.build_dict()
vuln_2['exploitation'] = vuln_2['severity']
data = {
'csrf_token': csrf_token,
'vulns': [vuln_1, vuln_2]
}
res = test_client.post('/v3/vulnerability_template/bulk_create', json=data)
assert res.status_code == 200
vulns_created = res.json['vulns_created']
assert len(vulns_created) == 2
assert vulns_created[0][1] == vuln_1['name']
assert vulns_created[1][1] == vuln_2['name']
def test_bulk_create_without_csrf_token(self, test_client):
vuln_1 = VulnerabilityFactory.build_dict()
vuln_1['exploitation'] = vuln_1['severity']
vuln_2 = VulnerabilityFactory.build_dict()
vuln_2['exploitation'] = vuln_2['severity']
data = {
'vulns': [vuln_1, vuln_2]
}
res = test_client.post('/v3/vulnerability_template/bulk_create', json=data)
assert res.status_code == 403
assert res.json['message'] == 'Invalid CSRF token.'
def test_bulk_create_without_data(self, test_client, csrf_token):
data = {'csrf_token': csrf_token}
res = test_client.post('/v3/vulnerability_template/bulk_create', json=data)
assert res.status_code == 400
assert res.json['message'] == 'Missing data to create vulnerabilities templates.'
def test_bulk_create_with_one_conflict(self, test_client, session, csrf_token):
vuln_template = VulnerabilityTemplate(name='conflict_vuln', severity='high')
session.add(vuln_template)
session.commit()
vuln_1 = VulnerabilityFactory.build_dict()
vuln_1['name'] = 'conflict_vuln'
vuln_1['exploitation'] = vuln_1['severity']
vuln_2 = VulnerabilityFactory.build_dict()
vuln_2['exploitation'] = vuln_2['severity']
data = {
'csrf_token': csrf_token,
'vulns': [vuln_1, vuln_2]
}
res = test_client.post('/v3/vulnerability_template/bulk_create', json=data)
assert res.status_code == 200
assert len(res.json['vulns_with_conflict']) == 1
assert res.json['vulns_with_conflict'][0][1] == vuln_1['name']
assert len(res.json['vulns_created']) == 1
assert res.json['vulns_created'][0][1] == vuln_2['name']
def test_bulk_create_with_conflict_in_every_vuln(self, test_client, session, csrf_token):
vuln_template_1 = VulnerabilityTemplate(name='conflict_vuln_1', severity='high')
session.add(vuln_template_1)
vuln_template_2 = VulnerabilityTemplate(name='conflict_vuln_2', severity='high')
session.add(vuln_template_2)
session.commit()
vuln_1 = VulnerabilityFactory.build_dict()
vuln_1['name'] = 'conflict_vuln_1'
vuln_1['exploitation'] = vuln_1['severity']
vuln_2 = VulnerabilityFactory.build_dict()
vuln_2['name'] = 'conflict_vuln_2'
vuln_2['exploitation'] = vuln_2['severity']
data = {
'csrf_token': csrf_token,
'vulns': [vuln_1, vuln_2]
}
res = test_client.post('/v3/vulnerability_template/bulk_create', json=data)
assert res.status_code == 409
assert len(res.json['vulns_with_conflict']) == 2
assert res.json['vulns_with_conflict'][0][1] == vuln_1['name']
assert res.json['vulns_with_conflict'][1][1] == vuln_2['name']
assert len(res.json['vulns_created']) == 0
def test_bulk_delete_vulnerabilities_template(self, test_client, session):
previous_count = session.query(VulnerabilityTemplate).count()
vuln_template_1 = VulnerabilityTemplate(name='vuln_1', severity='high')
session.add(vuln_template_1)
vuln_template_2 = VulnerabilityTemplate(name='vuln_2', severity='high')
session.add(vuln_template_2)
vuln_template_3 = VulnerabilityTemplate(name='vuln_3', severity='high')
session.add(vuln_template_3)
session.commit()
data = {'ids': [vuln_template_1.id, vuln_template_2.id, vuln_template_3.id]}
res = test_client.delete(self.url(), data=data)
assert res.status_code == 200
assert res.json['deleted'] == 3
assert previous_count == session.query(VulnerabilityTemplate).count()