-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpatch_opnsense_config.py
More file actions
364 lines (315 loc) · 18.6 KB
/
patch_opnsense_config.py
File metadata and controls
364 lines (315 loc) · 18.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
import logging
import time
import uuid
import base64
import xml.etree.ElementTree as ET
from typing import List
from cryptography.hazmat.primitives import serialization
from utility_dataclasses import FgData, FgDataSchema, FgNetAlias, FgNetAliasGroup, FgIPAlias, \
FgVpnIpsecPhase1, FgVpnIpsecPhase2, FgVpnCertCa, FgVpnCertLocal
DEVELOPER_MODE_SKIPPS = True
def patch_config(config_xml_file: str, fw_data_json_file: str, output_xml_file: str) -> None:
pass
def pretty_xml(element: ET.Element, indent=' ') -> str:
import xml.dom.minidom
xml_str = ET.tostring(element, 'utf-8')
dom = xml.dom.minidom.parseString(xml_str)
return '\n'.join([line for line in dom.toprettyxml(indent=indent).split('\n') if line.strip()])
def _add_created_signature_to_fw_rule(element: ET.Element) -> None:
created = ET.SubElement(element, 'created')
ET.SubElement(created, 'username').text = 'root@fortigate-migration-tool'
t = str(time.time()).split('.')
ET.SubElement(created, 'time').text = '.'.join([t[0], t[1][:4]])
ET.SubElement(created, 'description').text = 'created by automated fortigate-migration-tool'
def _read_json_file(fw_data_json_file: str) -> FgData:
with open(fw_data_json_file, 'r') as f:
s = FgDataSchema()
return s.loads(f.read())
def _add_net_aliases(config_root: ET.Element, net_alias: List[FgNetAlias]) -> None:
for fw_alias in net_alias:
new_alias = ET.SubElement(config_root.find('OPNsense').find('Firewall').find('Alias').find('aliases'), 'alias')
new_alias.set('uuid', str(uuid.uuid4()))
ET.SubElement(new_alias, 'enabled').text = '1'
ET.SubElement(new_alias, 'name').text = fw_alias.name
ET.SubElement(new_alias, 'proto')
ET.SubElement(new_alias, 'description').text = fw_alias.comment
ET.SubElement(new_alias, 'counters').text = '0'
ET.SubElement(new_alias, 'updatefreq')
if fw_alias.fqdn is not None:
ET.SubElement(new_alias, 'type').text = 'host'
ET.SubElement(new_alias, 'content').text = fw_alias.fqdn
elif fw_alias.net_list is not None and len(fw_alias.net_list) > 0:
ET.SubElement(new_alias, 'type').text = 'network'
ET.SubElement(new_alias, 'content').text = '\n'.join([x.exploded for x in fw_alias.net_list])
else:
error_str = 'Nighter FgNetAlias.fqdn nor FgNetAlias.net_list are set in alias "{}".'.format(fw_alias.name)
logging.fatal(error_str)
raise ValueError(error_str)
def _add_group_aliases(config_root: ET.Element, net_alias_group: List[FgNetAliasGroup]) -> None:
for fw_alias_group in net_alias_group:
new_alias = ET.SubElement(config_root.find('OPNsense').find('Firewall').find('Alias').find('aliases'), 'alias')
new_alias.set('uuid', str(uuid.uuid4()))
ET.SubElement(new_alias, 'enabled').text = '1'
ET.SubElement(new_alias, 'name').text = fw_alias_group.name
ET.SubElement(new_alias, 'proto')
ET.SubElement(new_alias, 'description').text = fw_alias_group.comment
ET.SubElement(new_alias, 'counters').text = '0'
ET.SubElement(new_alias, 'updatefreq')
ET.SubElement(new_alias, 'type').text = 'networkgroup'
ET.SubElement(new_alias, 'content').text = '\n'.join(fw_alias_group.net_alias_list)
def _add_ip_aliases(config_root: ET.Element, ip_alias: List[FgIPAlias]) -> None:
for fw_ip_alias in ip_alias:
new_alias = ET.SubElement(config_root.find('OPNsense').find('Firewall').find('Alias').find('aliases'), 'alias')
new_alias.set('uuid', str(uuid.uuid4()))
ET.SubElement(new_alias, 'enabled').text = '1'
ET.SubElement(new_alias, 'name').text = fw_ip_alias.name
ET.SubElement(new_alias, 'proto')
ET.SubElement(new_alias, 'description').text = fw_ip_alias.comment
ET.SubElement(new_alias, 'counters').text = '0'
ET.SubElement(new_alias, 'updatefreq')
ET.SubElement(new_alias, 'type').text = 'host'
ET.SubElement(new_alias, 'content').text = fw_ip_alias.ip.exploded
def _find_ipsec_phase1_ikeid(config_root: ET.Element, phase1name: str) -> str:
try:
for phase1 in config_root.find('ipsec').findall('phase1'):
try:
if phase1.find('descr').text.startswith(phase1name):
return phase1.find('ikeid').text
except AttributeError as err:
error_str = 'Encountered invalid <phase1> entry in <"config_root" -> ipsec>: {}'.format(err)
logging.fatal(error_str)
raise RuntimeError(error_str)
except AttributeError as err:
error_str = 'Could not find <ipsec -> phase1> in "config_root": {}'.format(err)
logging.fatal(error_str)
raise RuntimeError(error_str)
error_str = 'Could not find phase1name "{}" in <"config_root" -> ipsec -> phase1>!'.format(phase1name)
logging.error(error_str)
raise ValueError(error_str)
def _get_next_ipsec_phase1_ikeid(config_root: ET.Element) -> int:
max_ikeid = 0
try:
for phase1 in config_root.find('ipsec').findall('phase1'):
ikeid = int(phase1.find('ikeid').text)
if ikeid > max_ikeid:
max_ikeid = ikeid
except AttributeError as err:
logging.info('Could not find <ipsec -> phase1> in "config_root": {}'.format(err))
return max_ikeid + 1
def _add_ipsec_phase1(config_root: ET.Element, ipsec_phase_1: List[FgVpnIpsecPhase1]) -> None:
if config_root.find('ipsec') is None:
ET.SubElement(config_root, 'ipsec')
for phase1 in ipsec_phase_1:
if phase1.xauthtype is not None or phase1.authusrgrp is not None or phase1.remote_gw is None:
logging.error(
'Could not migrate ipsec phase1 configuration "{}"! Automatic XAuth migration is not possible!'.format(phase1.name))
continue
ikeid = str(_get_next_ipsec_phase1_ikeid(config_root)) # Do this before creating new node to suppress errors
new_phase1 = ET.SubElement(config_root.find('ipsec'), 'phase1')
ET.SubElement(new_phase1, 'descr').text = '{} -> {}'.format(phase1.name, phase1.comment)
ET.SubElement(new_phase1, 'ikeid').text = ikeid
ET.SubElement(new_phase1, 'iketype').text = 'ike'
ET.SubElement(new_phase1, 'interface').text = phase1.interface
ET.SubElement(new_phase1, 'protocol').text = 'inet'
ET.SubElement(new_phase1, 'lifetime').text = str(phase1.keylife)
ET.SubElement(new_phase1, 'pre-shared-key').text = phase1.psksecret
ET.SubElement(new_phase1, 'private-key')
ET.SubElement(new_phase1, 'authentication_method').text = 'pre_shared_key'
ET.SubElement(new_phase1, 'dhgroup').text = ','.join([str(x) for x in phase1.dhgrp])
ET.SubElement(new_phase1, 'remote-gateway').text = phase1.remote_gw.exploded
ET.SubElement(new_phase1, 'nat_traversal').text = 'on' if phase1.nattraversal else 'off'
if phase1.connect_type == 'static':
pass # this is the default
elif phase1.connect_type == 'dynamic': # allow gateway to be on a dynamic IP
ET.SubElement(new_phase1, 'rightallowany').text = 1
else:
error_str = 'Encountered an unexpected FgVpnIpsecPhase1.connect_type: {}'.format(phase1.connect_type)
logging.error(error_str)
raise NotImplementedError(error_str)
if phase1.dpd:
ET.SubElement(new_phase1, 'dpd_maxfail').text = '3'
ET.SubElement(new_phase1, 'dpd_delay').text = '5'
ET.SubElement(new_phase1, 'dpd_action').text = 'restart'
enc_str = None
for c_prop in phase1.c_proposal:
se_enc_alg = ET.SubElement(new_phase1, 'encryption-algorithm')
skip_enc_alg = False
if enc_str is None:
enc_str = c_prop.encrypt
elif enc_str != c_prop.encrypt:
error_str = 'OPNsense ipsec phase1 configuration only allows one encryption algorithm specification! '
error_str += 'Already configured "{}" conflicts "{}".\n SKIPPED: '.format(enc_str, c_prop.encrypt)
error_str += '"{}" remains the only possible encryption algorithm for FgVpnIpsecPhase1 "{}"'.format(enc_str, phase1.name)
logging.error(error_str)
skip_enc_alg = True
if not skip_enc_alg:
if enc_str.startswith('aes'):
ET.SubElement(se_enc_alg, 'name').text = 'aes'
keybits = int(enc_str[3:])
ET.SubElement(se_enc_alg, 'keylen').text = str(keybits)
elif enc_str == '3des':
logging.warning('Insecure encryption algorithm "{}" in FgVpnIpsecPhase1 "{}" encountered.'.format(enc_str, phase1.name))
ET.SubElement(se_enc_alg, 'name').text = '3des'
else:
error_str = 'Unsupported or insecure encryption algorithm "{}" in FgVpnIpsecPhase1 "{}" encountered.'.format(enc_str, phase1.name)
logging.fatal(error_str)
raise NotImplementedError(error_str)
if new_phase1.find('hash-algorithm') is None:
hash_alg_str = ''
else:
hash_alg_str = new_phase1.find('hash-algorithm').text + ','
if c_prop.digest.startswith('sha'):
ET.SubElement(new_phase1, 'hash-algorithm').text = hash_alg_str + c_prop.digest
elif c_prop.digest == 'md5':
logging.warning('Insecure digest algorithm "{}" in FgVpnIpsecPhase1 "{}" encountered.'.format(c_prop.digest, phase1.name))
ET.SubElement(new_phase1, 'hash-algorithm').text = hash_alg_str + c_prop.digest
else:
error_str = 'Unsupported or insecure digest algorithm "{}" in FgVpnIpsecPhase1 "{}" encountered.'.format(c_prop.digest, phase1.name)
logging.fatal(error_str)
raise NotImplementedError(error_str)
def _add_ipsec_phase2(config_root: ET.Element, ipsec_phase_2: List[FgVpnIpsecPhase2]) -> None:
for phase2 in ipsec_phase_2:
try:
ikeid = _find_ipsec_phase1_ikeid(config_root, phase2.phase1name)
except ValueError:
logging.error('Could not create configuration for ipsec phase2 "{}"! Matching phase1 entry is missing.'.format(phase2.name))
continue
new_phase2 = ET.SubElement(config_root.find('ipsec'), 'phase2')
ET.SubElement(new_phase2, 'ikeid').text = ikeid
ET.SubElement(new_phase2, 'uniqid').text = uuid.uuid4().hex[:13]
ET.SubElement(new_phase2, 'mode').text = 'tunnel'
ET.SubElement(new_phase2, 'lifetime').text = str(phase2.keylife)
ET.SubElement(new_phase2, 'descr').text = phase2.name
ET.SubElement(new_phase2, 'protocol').text = 'esp'
se_localid = ET.SubElement(new_phase2, 'localid')
if phase2.src_addr_type == 'ip':
ET.SubElement(se_localid, 'type').text = 'address'
ET.SubElement(se_localid, 'address').text = phase2.src_ip.exploded
elif phase2.src_addr_type == 'net':
ET.SubElement(se_localid, 'type').text = 'network'
ip = phase2.src_net.with_prefixlen.split('/')
assert len(ip) == 2
ET.SubElement(se_localid, 'address').text = ip[0]
ET.SubElement(se_localid, 'netbits').text = ip[1]
else:
error_str = 'Encountered unexpected FgVpnIpsecPhase2.src_addr_type value: {}'.format(phase2.dst_addr_type)
logging.fatal(error_str)
raise NotImplementedError(error_str)
se_remoteid = ET.SubElement(new_phase2, 'remoteid')
if phase2.dst_addr_type == 'ip':
ET.SubElement(se_remoteid, 'type').text = 'address'
ET.SubElement(se_remoteid, 'address').text = phase2.dst_ip.exploded
elif phase2.dst_addr_type == 'net':
ET.SubElement(se_remoteid, 'type').text = 'network'
ip = phase2.dst_net.with_prefixlen.split('/')
assert len(ip) == 2
ET.SubElement(se_remoteid, 'address').text = ip[0]
ET.SubElement(se_remoteid, 'netbits').text = ip[1]
else:
error_str = 'Encountered unexpected FgVpnIpsecPhase2.dst_addr_type value: {}'.format(phase2.dst_addr_type)
logging.fatal(error_str)
raise NotImplementedError(error_str)
for c_prop in phase2.c_proposal:
skip_enc_alg = False
for c in new_phase2.findall('encryption-algorithm-option'):
if (c.find('name').text == '3des' and c_prop.encrypt == '3des') or \
(c.find('name').text == 'aes' and c_prop.encrypt.startswith('aes') and c.find('keylen').text == c_prop.encrypt[3:]):
skip_enc_alg = True
break
if not skip_enc_alg:
se_enc_alg = ET.SubElement(new_phase2, 'encryption-algorithm-option')
enc_str = c_prop.encrypt
if enc_str.startswith('aes'):
ET.SubElement(se_enc_alg, 'name').text = 'aes'
keybits = int(enc_str[3:])
ET.SubElement(se_enc_alg, 'keylen').text = str(keybits)
elif enc_str == '3des':
logging.warning('Insecure encryption algorithm "{}" in FgVpnIpsecPhase2 "{}" encountered.'.format(enc_str, phase2.name))
ET.SubElement(se_enc_alg, 'name').text = '3des'
else:
error_str = 'Unsupported or insecure encryption algorithm "{}" in FgVpnIpsecPhase2 "{}" encountered.'.format(enc_str, phase2.name)
logging.fatal(error_str)
raise NotImplementedError(error_str)
skip_digest = False
for h in new_phase2.findall('hash-algorithm-option'):
if h.text == 'hmac_{}'.format(c_prop.digest): # Skipps duplicates
skip_digest = True
break
if not skip_digest:
if c_prop.digest.startswith('sha'):
ET.SubElement(new_phase2, 'hash-algorithm-option').text = 'hmac_{}'.format(c_prop.digest)
elif c_prop.digest == 'md5':
logging.warning('Insecure digest algorithm "{}" in FgVpnIpsecPhase2 "{}" encountered.'.format(c_prop.digest, phase2.name))
ET.SubElement(new_phase2, 'hash-algorithm-option').text = 'hmac_{}'.format(c_prop.digest)
else:
error_str = 'Unsupported or insecure digest algorithm "{}" in FgVpnIpsecPhase2 "{}" encountered.'.format(c_prop.digest, phase2.name)
logging.fatal(error_str)
raise NotImplementedError(error_str)
def _add_ca_certs(config_root: ET.Element, vpn_cert_ca: List[FgVpnCertCa]) -> None:
for ca in vpn_cert_ca:
skip_cert = False
for c in config_root.findall('ca'):
if c.find('descr').text == ca.name:
skip_cert = True
break
if not skip_cert:
new_ca = ET.SubElement(config_root, 'ca')
ET.SubElement(new_ca, 'refid').text = uuid.uuid4().hex[:13]
ET.SubElement(new_ca, 'descr').text = ca.name
ET.SubElement(new_ca, 'serial').text = '0'
ET.SubElement(new_ca, 'crt').text = base64.b64encode(ca.cert.encode('utf-8')).decode('utf-8')
# ET.SubElement(new_ca, 'crt').text
def _add_local_certs(config_root: ET.Element, vpn_cert_local: List[FgVpnCertLocal]) -> None:
for crt in vpn_cert_local:
skip_cert = False
for c in config_root.findall('cert'):
if c.find('descr').text == crt.name:
skip_cert = True
break
if not skip_cert:
new_crt = ET.SubElement(config_root, 'cert')
ET.SubElement(new_crt, 'refid').text = uuid.uuid4().hex[:13]
ET.SubElement(new_crt, 'descr').text = crt.name
ET.SubElement(new_crt, 'crt').text = base64.b64encode(crt.cert.encode('utf-8')).decode('utf-8')
# Decrypt the private key
try:
pk = serialization.load_pem_private_key(crt.private_key.encode('utf-8'), password=crt.password.encode('utf-8'))
except ValueError as err:
error_str = 'Could not load private key of certificate "{}" because of "{}".'.format(crt.name, err)
logging.fatal(error_str)
if 'DEVELOPER_MODE_SKIPPS' in globals():
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import rsa
key_size = x509.load_pem_x509_certificate(crt.cert.encode('utf-8')).public_key().key_size
crt_dummy = rsa.generate_private_key(public_exponent=65537, key_size=key_size).private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()).decode('utf-8')
pk = serialization.load_pem_private_key(crt_dummy.encode('utf-8'), password=None)
logging.fatal(' --> Ignored because DEVELOPER_MODE_SKIPPS: used random key material to replace missing private key!\n >> THIS WILL RESULT IN BROKEN CERTIFICATES! <<')
else:
raise ValueError(error_str)
decrypted_priv_pem = pk.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()).decode('utf-8')
ET.SubElement(new_crt, 'prv').text = base64.b64encode(decrypted_priv_pem.encode('utf-8')).decode('utf-8')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s: %(message)s")
config_xml_file = 'config-site-1-opnsense-1.localdomain.xml'
fw_data_json_file = "FwData.json"
output_xml_file = 'config_test.xml'
# patch_config(config_xml_file,fw_data_json_file,output_xml_file)
config_root = ET.parse(config_xml_file).getroot()
fw_data = _read_json_file(fw_data_json_file)
_add_net_aliases(config_root, fw_data.net_alias)
_add_group_aliases(config_root, fw_data.net_alias_group)
_add_ip_aliases(config_root, fw_data.ip_alias)
_add_ipsec_phase1(config_root, fw_data.vpn_ipsec_phase_1)
_add_ipsec_phase2(config_root, fw_data.vpn_ipsec_phase_2)
_add_ca_certs(config_root, fw_data.vpn_cert_ca)
_add_local_certs(config_root, fw_data.vpn_cert_local)
# for policy in
# new_rule = ET.SubElement(config_root.find('filter'), 'rule')
# ET.SubElement(new_rule, 'type').text =
with open('config_test.xml', 'w') as f:
f.write(pretty_xml(config_root))