import os import time import sys import gzip import json import subprocess import traceback import hashlib from prettytable import PrettyTable from libwgopparser import Parser def get_sha256(data_bytes): return hashlib.sha256(data_bytes).hexdigest() def direct_parse(raw_output): wg_prikey = raw_output[0][0] wg_pubkey = raw_output[0][1] wg_listen_port = int(raw_output[0][2]) wg_fwmark = 0 if raw_output[0][3] == "off" else int(raw_output[0][3], 16) wg_peers = {} for line in raw_output[1:]: wg_peers[line[0]] = { "preshared_key": "" if line[1] == "(none)" else line[1], "endpoint": "" if line[2] == "(none)" else line[2], "allowed": "" if line[3] == "(none)" else line[3], "last_handshake": int(line[4]), "rx_bytes": int(line[5]), "tx_bytes": int(line[6]), "keepalive": 0 if line[7] == "off" else int(line[7]), } return { "private_key": wg_prikey, "public_key": wg_pubkey, "listen_port": wg_listen_port, "fwmark": wg_fwmark, "peers": wg_peers, } def direct_dump(interface_name): raw_output = subprocess.check_output(["wg", "show", interface_name, "dump"]).decode().strip().split('\n') if not raw_output: return raw_output = [line.split('\t') for line in raw_output] return direct_parse(raw_output) def direct_dump_all(): raw_output = subprocess.check_output(["wg", "show", "all", "dump"]).decode().strip().split('\n') if not raw_output: return raw_output = [line.split('\t') for line in raw_output] raw_lines = {} for line in raw_output: interface_name = line[0] if interface_name not in raw_lines: raw_lines[interface_name] = [line[1:]] else: raw_lines[interface_name].append(line[1:]) return {interface_name: direct_parse(raw_lines[interface_name]) for interface_name in raw_lines} class Config: def __init__(self, filepath=None): self.path_config = filepath or ".local.storage.dat" self.config = {} self.last_load_hash = '' self.ensure_load() def _load_config(self): with open(self.path_config, 'rb') as f: raw_config = f.read() loaded_config = json.loads(gzip.decompress(raw_config)) return loaded_config, get_sha256(raw_config) def load(self): print('Loading config from {}...'.format(self.path_config)) self.config, self.last_load_hash = self._load_config() def save(self): print('Saving config to {}...'.format(self.path_config)) saved_config = json.dumps(self.config, ensure_ascii=False) try: if self.last_load_hash: _, disk_hash = self._load_config() if disk_hash != self.last_load_hash: print('[WARN] file might have been changed/modified out of wg-op-admin.') with open(self.path_config, 'wb') as f: raw_config = gzip.compress(saved_config.encode()) f.write(raw_config) self.last_load_hash = get_sha256(raw_config) except Exception: print('Unable to save config, content is printed below to avoid data loss.') print(saved_config) raise print('Config saved.') def ensure_load(self): try: self.load() except Exception: print(traceback.format_exc()) print('Unable to load config, try create a new one.') self.save() def _compile(self, interface_name): interface_config = self.config[interface_name] output = [] output.append("[Interface]") output.append("Address={}".format(interface_config["address"])) output.append("PrivateKey={}".format(interface_config["private_key"])) if interface_config["listen_port"]: output.append("ListenPort={}".format(interface_config["listen_port"])) if interface_config["is_enable_dns_reloader"]: output.append("#enable-dns-reload") for peer_key in interface_config["peers"]: peer_info = interface_config["peers"][peer_key] output.append("[Peer]") output.append("PublicKey={}".format(peer_key)) output.append("AllowedIPs={}".format(peer_info["allowed"])) if peer_info["keepalive"]: output.append("PersistentKeepalive={}".format(peer_info["keepalive"])) if peer_info["endpoint_type"] == "tunnel": output.append("#use-tunnel {}".format(peer_info["endpoint"])) elif peer_info["endpoint_type"] == "custom": output.append("Endpoint={}".format(peer_info["endpoint"])) return '\n'.join(output) def _build(self, interface_name): raw_content = self._compile(interface_name) wgop_basepath = os.path.dirname(os.path.realpath(sys.argv[0])) parser = Parser(wgop_basepath) parser.parse(raw_content) parser.compile_interface() parser.compile_peers() parser.compile_final() return parser.get_result() def ui_create_interface(self, interface_name): if interface_name in self.config: print('unable to create interface, name `{}` alreay used.'.format(interface_name)) return print('Generating key pairs...') wg_private_key = subprocess.check_output(["wg", "genkey"]).decode().strip() wg_public_key = subprocess.check_output(["wg", "pubkey"], input=wg_private_key.encode()).decode().strip() user_input = input('Enter listen port: (random) ') if not user_input: wg_port = 0 else: wg_port = int(user_input) while True: user_input = input('Enter LAN IP: ') if not user_input: continue wg_lan_ip = user_input break self.config[interface_name] = { "private_key": wg_private_key, "public_key": wg_public_key, "listen_port": wg_port, "address": wg_lan_ip, "interface_name": interface_name, "peers": {}, "connectors": {}, "servers": {}, "ts_create": int(time.time()), } self.save() self.ui_interface(interface_name) def ui_interface(self, interface_name): while True: print('''----- Editing interface {} ----- [1] Add peer [2] List peers [3] Add connector [4] List connectors [q] Quit '''.format(interface_name)) user_input = input('> '.format(interface_name)) if not user_input: continue user_input = user_input.lower().strip() if user_input == 'q': return if user_input == '1': self.ui_add_peer(interface_name) def ui_add_peer(self, interface_name): print('>>> Creating new peer for interface {}'.format(interface_name)) while True: user_input = input("Enter peer public key: ") if user_input: break wg_peer_key = user_input.strip() if wg_peer_key in self.config[interface_name]["peers"]: print('Peer already exists.') return while True: user_input = input("Enter peer allowed ips: ") if user_input: break wg_peer_allowed = user_input.strip() user_input = input("Enter persistent keepalive: (15) ") if user_input: wg_peer_keepalive = int(user_input) else: wg_peer_keepalive = 0 connectors_info = self.config[interface_name]["connectors"] if connectors_info: print("=== Available connectors ===") for conn_name in connectors_info: print("[{}] {} {}".format(conn_name, connectors_info[conn_name]["type"], connectors_info[conn_name]["remote"])) user_input = input('Enter endpoint: (empty) ') if not user_input: wg_peer_endpoint = '' wg_peer_endpoint_type = '' elif user_input.strip() in connectors_info: wg_peer_endpoint = user_input.strip() wg_peer_endpoint_type = 'tunnel' else: wg_peer_endpoint = user_input.strip() wg_peer_endpoint_type = 'custom' if wg_peer_endpoint_type == 'custom' and not self.config[interface_name].get('is_enable_dns_reload', False): user_input = input('Enable DNS reloader?: (y/N) ') if user_input and user_input.lower().strip() == 'y': self.config[interface_name]['is_enable_dns_reload'] = True self.config[interface_name]["peers"][wg_peer_key] = { "allowed": wg_peer_allowed, "keepalive": wg_peer_keepalive, "endpoint": wg_peer_endpoint, "endpoint_type": wg_peer_endpoint_type, } self.save() def _ui_show_single(self, interface_name, info): print('\ninterface: {}\n public key: {}\n listening port: {}'.format(interface_name, info['public_key'], info['listen_port'])) if info['fwmark']: print(' fwmark: {} ({})'.format(hex(info['fwmark']), info['fwmark'])) pt = PrettyTable(["Name", "AllowedIPs", "Endpoint", "Last Handshake", "Received", "Sent", "Keepalive"]) for peer_key, peer_info in info['peers'].items(): pt.add_row([peer_key, peer_info['allowed'], peer_info['endpoint'], int(time.time()) - peer_info['last_handshake'] if peer_info['last_handshake'] else "", peer_info['rx_bytes'], peer_info['tx_bytes'], peer_info['keepalive']]) print(pt.get_string()) def ui_show_status(self, interface_name): if interface_name: info = direct_dump(interface_name) self._ui_show_single(interface_name, info) else: info = direct_dump_all() for name in info: self._ui_show_single(name, info[name]) if __name__ == "__main__": c = Config() if len(sys.argv) < 2: print('Commands: new, list, edit, del, up/start, down/stop, enable, disable, status') exit(1) if sys.argv[1] == "new": if len(sys.argv) < 3: print('Syntax: new ') exit(1) wg_name = sys.argv[2] c.ui_create_interface(wg_name) elif sys.argv[1] == "status": if len(sys.argv) < 3: c.ui_show_status(None) else: wg_name = sys.argv[2] c.ui_show_status(wg_name)